You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/11/03 12:22:22 UTC

[impala] 02/02: IMPALA-10777: Enable min/max filtering for Iceberg partitions

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9ed4b3689784670532e840c5cb0389bdd9d5c0e8
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Tue Oct 5 14:35:59 2021 +0200

    IMPALA-10777: Enable min/max filtering for Iceberg partitions
    
    This patch enables min/max filters for Iceberg columns that
    participate in table partitioning. The min/max filters are
    evaluated at the Parquet row group level. This means that it
    is still slower than dynamic partition pruning (which doesn't
    even need to open the files), but much faster than no pruning at all.
    
    Performance
    
    I used the following query to measure perf on a scale 10 TPC-DS
    dataset:
    
     select i_item_id,sum(ss_ext_sales_price) total_sales
     from
             store_sales,
             date_dim,
              customer_address,
              item
     where i_item_id in (select
          i_item_id
     from item
     where i_color in ('orchid','chiffon','lace'))
      and     ss_item_sk              = i_item_sk
      and     ss_sold_date_sk         = d_date_sk
      and     d_year                  = 2000
      and     d_moy                   = 1
      and     ss_addr_sk              = ca_address_sk
      and     ca_gmt_offset           = -8
    
    The above query took the following times to execute:
    
    Regular Parquet table: 1.16s
    Iceberg table without min/max filters: 4.39s
    Iceberg table with min/max filters: 1.77s
    
    Testing:
     * added e2e test
     * planner test could not be added because Iceberg tables behave
       differently during planner tests (due to some hacks that needs
       refactoring)
    
    Change-Id: I51b53188c6da7eeebfeae385e1de31ace0980cac
    Reviewed-on: http://gerrit.cloudera.org:8080/17960
    Reviewed-by: Qifan Chen <qc...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-scanner.h                         |  7 ++++
 be/src/exec/parquet/hdfs-parquet-scanner.cc        | 21 +++++-------
 be/src/exec/parquet/hdfs-parquet-scanner.h         |  6 ++--
 be/src/runtime/runtime-filter.h                    |  5 +++
 common/thrift/PlanNodes.thrift                     |  4 +++
 .../org/apache/impala/catalog/FeIcebergTable.java  | 13 ++++++++
 .../java/org/apache/impala/catalog/FeTable.java    |  6 ++++
 .../impala/planner/RuntimeFilterGenerator.java     | 39 ++++++++++++++++++----
 .../queries/QueryTest/min_max_filters.test         | 10 ++++++
 9 files changed, 89 insertions(+), 22 deletions(-)

diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 6402bbf..6f80904 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -196,6 +196,13 @@ class HdfsScanner {
     return -1;
   }
 
+  /// Return the runtime filter by index 'filter_idx'.
+  const RuntimeFilter* GetFilter(int filter_idx) {
+    DCHECK_GE(filter_idx, 0);
+    DCHECK_LT(filter_idx, filter_ctxs_.size());
+    return filter_ctxs_[filter_idx]->filter;
+  }
+
   /// Scanner subclasses must implement these static functions as well.  Unfortunately,
   /// c++ does not allow static virtual functions.
 
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 4068aab..a47e1de 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -726,13 +726,12 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
     /// Find the index of the filter that is common in data structure
     /// filter_ctxs_ and filter_stats_.
     int idx = FindFilterIndex(filter_id);
-    DCHECK(idx >= 0);
 
-    if (IsBoundByPartitionColumn(idx)) {
-      continue;
-    }
+    const RuntimeFilter* filter = GetFilter(idx);
+    // We skip row group filtering if the column is not present in the data files.
+    if (!filter->IsColumnInDataFile(GetScanNodeId())) continue;
 
-    MinMaxFilter* minmax_filter = FindMinMaxFilter(idx);
+    MinMaxFilter* minmax_filter = GetMinMaxFilter(filter);
 
     VLOG(3) << "Try to filter out a rowgroup via overlap predicate filter: "
             << " fid=" << filter_id
@@ -1354,12 +1353,9 @@ void HdfsParquetScanner::GetMinMaxSlotsForOverlapPred(
   *max_slot = min_max_tuple_->GetSlot(max_slot_desc->tuple_offset());
 }
 
-MinMaxFilter* HdfsParquetScanner::FindMinMaxFilter(int filter_idx) {
-  if (filter_idx >= 0 && filter_idx < filter_ctxs_.size()) {
-    const RuntimeFilter* filter = filter_ctxs_[filter_idx]->filter;
-    if (filter && filter->is_min_max_filter()) {
-      return filter->get_min_max();
-    }
+MinMaxFilter* HdfsParquetScanner::GetMinMaxFilter(const RuntimeFilter* filter) {
+  if (filter && filter->is_min_max_filter()) {
+    return filter->get_min_max();
   }
   return nullptr;
 }
@@ -1461,7 +1457,8 @@ Status HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters(
     int slot_idx = desc.slot_index;
 
     int filter_idx = FindFilterIndex(filter_id);
-    MinMaxFilter* minmax_filter = FindMinMaxFilter(filter_idx);
+    const RuntimeFilter* filter = GetFilter(filter_idx);
+    MinMaxFilter* minmax_filter = GetMinMaxFilter(filter);
     if (!minmax_filter || !IsFilterWorthyForOverlapCheck(filter_idx)) {
       continue;
     }
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index bf243a4..25c2c02 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -644,9 +644,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// Return the overlap predicate descs from the HDFS scan plan.
   const vector<TOverlapPredicateDesc>& GetOverlapPredicateDescs();
 
-  /// Find and return the min/max filter at filter_ctx_[filter_idx].
-  /// Return nullptr if no min/max filter is found at that location.
-  MinMaxFilter* FindMinMaxFilter(int filter_idx);
+  /// Return the min/max filter of 'filter'.
+  /// Return nullptr if no min/max filter is present.
+  MinMaxFilter* GetMinMaxFilter(const RuntimeFilter* filter);
 
   /// Return true when the filter at filter_ctx_[filter_idx] is bound by a
   /// partition column and false otherwise.
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 45d869f..fa8eccf 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -122,6 +122,11 @@ class RuntimeFilter {
     return filter_desc().targets[target_ndx].is_bound_by_partition_columns;
   }
 
+  bool IsColumnInDataFile(int plan_id) const {
+    int target_ndx = filter_desc().planid_to_target_ndx.at(plan_id);
+    return filter_desc().targets[target_ndx].is_column_in_data_file;
+  }
+
   /// Frequency with which to check for filter arrival in WaitForArrival()
   static const int SLEEP_PERIOD_MS;
 
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 46db62d..e137250 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -120,6 +120,10 @@ struct TRuntimeFilterTargetDesc {
 
   // Indicates if the low and high value in column stats are present
   10: optional bool is_min_max_value_present
+
+  // Indicates if the column is actually stored in the data files (unlike partition
+  // columns in regular Hive tables)
+  11: optional bool is_column_in_data_file
 }
 
 enum TRuntimeFilterType {
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index c25664d..e362808 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -247,6 +247,19 @@ public interface FeIcebergTable extends FeFsTable {
     return getFeFsTable().getHostIndex();
   }
 
+  @Override /* FeTable */
+  default boolean isComputedPartitionColumn(Column col) {
+    Preconditions.checkState(col instanceof IcebergColumn);
+    IcebergColumn iceCol = (IcebergColumn)col;
+    IcebergPartitionSpec spec = getDefaultPartitionSpec();
+    if (spec == null || !spec.hasPartitionFields()) return false;
+
+    for (IcebergPartitionField partField : spec.getIcebergPartitionFields()) {
+      if (iceCol.getFieldId() == partField.getSourceId()) return true;
+    }
+    return false;
+  }
+
   /**
    * Current snapshot id of the table.
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeTable.java b/fe/src/main/java/org/apache/impala/catalog/FeTable.java
index 0d4f3a1..fe5f9f1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeTable.java
@@ -111,6 +111,12 @@ public interface FeTable {
   boolean isClusteringColumn(Column c);
 
   /**
+   * Return true when the column is used in a computed partition, e.g. in Iceberg
+   * partition transforms.
+   */
+  default boolean isComputedPartitionColumn(Column c) { return false; }
+
+  /**
    * Case-insensitive lookup.
    *
    * @return null if the column with 'name' is not found.
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 28cf07f..9ca74b8 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -46,6 +46,7 @@ import org.apache.impala.analysis.TupleId;
 import org.apache.impala.analysis.TupleIsNullPredicate;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
@@ -232,14 +233,17 @@ public final class RuntimeFilterGenerator {
       // The low and high value of the column on which the filter is applied
       public final TColumnValue lowValue;
       public final TColumnValue highValue;
+      // Indicates if the data is actually stored in the data files
+      public final boolean isColumnInDataFile;
 
       public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr,
-          boolean isBoundByPartitionColumns, boolean isLocalTarget, TColumnValue lowValue,
-          TColumnValue highValue) {
+          boolean isBoundByPartitionColumns, boolean isColumnInDataFile,
+          boolean isLocalTarget, TColumnValue lowValue, TColumnValue highValue) {
         Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds()));
         node = targetNode;
         expr = targetExpr;
         this.isBoundByPartitionColumns = isBoundByPartitionColumns;
+        this.isColumnInDataFile = isColumnInDataFile;
         this.isLocalTarget = isLocalTarget;
         this.lowValue = lowValue;
         this.highValue = highValue;
@@ -256,6 +260,9 @@ public final class RuntimeFilterGenerator {
         tFilterTarget.setTarget_expr_slotids(tSlotIds);
         tFilterTarget.setIs_bound_by_partition_columns(isBoundByPartitionColumns);
         tFilterTarget.setIs_local_target(isLocalTarget);
+        if (node instanceof HdfsScanNode) {
+          tFilterTarget.setIs_column_in_data_file(isColumnInDataFile);
+        }
         if (node instanceof KuduScanNode) {
           // assignRuntimeFilters() only assigns KuduScanNode targets if the target expr
           // is a slot ref, possibly with an implicit cast, pointing to a column.
@@ -290,6 +297,7 @@ public final class RuntimeFilterGenerator {
         return output.append("Target Id: " + node.getId() + " ")
             .append("Target expr: " + expr.debugString() + " ")
             .append("Partition columns: " + isBoundByPartitionColumns)
+            .append("Is column stored in data files: " + isColumnInDataFile)
             .append("Is local: " + isLocalTarget)
             .append("lowValue: " + (lowValue != null ? lowValue.toString() : -1))
             .append("highValue: " + (highValue != null ? highValue.toString() : -1))
@@ -924,6 +932,8 @@ public final class RuntimeFilterGenerator {
       boolean isBoundByPartitionColumns = isBoundByPartitionColumns(analyzer, targetExpr,
           scanNode);
       if (disableRowRuntimeFiltering && !isBoundByPartitionColumns) continue;
+      boolean isColumnInDataFile = isColumnInDataFile(scanNode.getTupleDesc().getTable(),
+          isBoundByPartitionColumns);
       boolean isLocalTarget = isLocalTarget(filter, scanNode);
       if (runtimeFilterMode == TRuntimeFilterMode.LOCAL && !isLocalTarget) continue;
 
@@ -1008,7 +1018,8 @@ public final class RuntimeFilterGenerator {
       }
       RuntimeFilter.RuntimeFilterTarget target =
           new RuntimeFilter.RuntimeFilterTarget(scanNode, targetExpr,
-              isBoundByPartitionColumns, isLocalTarget, lowValue, highValue);
+              isBoundByPartitionColumns, isColumnInDataFile, isLocalTarget,
+              lowValue, highValue);
       filter.addTarget(target);
     }
 
@@ -1033,13 +1044,13 @@ public final class RuntimeFilterGenerator {
     Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds()));
     TupleDescriptor baseTblDesc = targetNode.getTupleDesc();
     FeTable tbl = baseTblDesc.getTable();
-    if (tbl.getNumClusteringCols() == 0) return false;
+    if (tbl.getNumClusteringCols() == 0 && !(tbl instanceof FeIcebergTable)) return false;
     List<SlotId> sids = new ArrayList<>();
     targetExpr.getIds(null, sids);
     for (SlotId sid : sids) {
-      SlotDescriptor slotDesc = analyzer.getSlotDesc(sid);
-      if (slotDesc.getColumn() == null
-          || slotDesc.getColumn().getPosition() >= tbl.getNumClusteringCols()) {
+      Column col = analyzer.getSlotDesc(sid).getColumn();
+      if (col == null) return false;
+      if (!tbl.isClusteringColumn(col) && !tbl.isComputedPartitionColumn(col)) {
         return false;
       }
     }
@@ -1047,6 +1058,20 @@ public final class RuntimeFilterGenerator {
   }
 
   /**
+   * Return true if the column is actually stored in the data file. E.g. partition
+   * columns are not stored in most cases.
+   */
+  private static boolean isColumnInDataFile(FeTable tbl,
+      boolean isBoundByPartitionColumns) {
+    // Non-partition values are always sotred in the data files.
+    if (!isBoundByPartitionColumns) return true;
+    // Iceberg uses hidden partitioning which means all columns are stored in the data
+    // files.
+    if (tbl instanceof FeIcebergTable) return true;
+    return false;
+  }
+
+  /**
    * Computes the target expr for a specified runtime filter 'filter' to be applied at
    * the scan node with target tuple descriptor 'targetTid'.
    */
diff --git a/testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test b/testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test
index e1248bb..f14e02a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test
@@ -425,3 +425,13 @@ on a.date_col = b.date_col and b.date_col = c.date_col;
 aggregation(SUM, ProbeRows): 48
 row_regex: .*1 of 1 Runtime Filter Published.*
 ====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select * from functional_parquet.iceberg_partitioned i1,
+              functional_parquet.iceberg_partitioned i2
+where i1.action = i2.action and
+      i1.id = i2.id and
+      i2.event_time = '2020-01-01 10:00:00';
+---- RUNTIME_PROFILE
+row_regex:.* RF00.\[min_max\] -> i1\.action.*
+====