You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2021/07/16 01:30:54 UTC

[impala] 01/02: IMPALA-10763: Min/max filters should be enabled on Z-order sorted columns

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 36d8e6766e3f1ce133a86b3edc5fee34e5e71cc7
Author: Qifan Chen <qc...@cloudera.com>
AuthorDate: Thu Jun 24 20:18:50 2021 -0400

    IMPALA-10763: Min/max filters should be enabled on Z-order sorted columns
    
    This patch enables min/max filtering on any Z-order sort-by columns
    by default.
    
    Since the column stats for a row group or a page is computed from the
    column values stored in the row group or the page, the current
    infrastructure for min/max filtering works for the Z-order out of box.
    The fact that these column values are ordered by Z-order is
    orthogonal to the work of min/max filtering.
    
    By default, the new feature is enabled. Set the existing control knob
    minmax_filter_sorted_columns to false to turn it off.
    
    Testing
      1. Added new z-order related sort column tests in
         overlap_min_max_filters_on_sorted_columns.test;
      2. Ran core-test.
    
    Change-Id: I2a528ffbd0e333721ef38b4be7d4ddcdbf188adf
    Reviewed-on: http://gerrit.cloudera.org:8080/17635
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-options.cc                    |  2 +-
 .../java/org/apache/impala/catalog/FeFsTable.java  | 34 ++++++--
 .../org/apache/impala/planner/HdfsScanNode.java    | 43 +++++-----
 .../overlap_min_max_filters_on_sorted_columns.test | 95 ++++++++++++++++++++++
 4 files changed, 146 insertions(+), 28 deletions(-)

diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index e92e81f..4e431a2 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1097,7 +1097,7 @@ Status impala::SetQueryOption(const string& key, const string& value,
       }
       case TImpalaQueryOptions::DELETE_STATS_IN_TRUNCATE: {
         query_options->__set_delete_stats_in_truncate(IsTrue(value));
-         break;
+        break;
       }
       case TImpalaQueryOptions::MINMAX_FILTER_SORTED_COLUMNS: {
         query_options->__set_minmax_filter_sorted_columns(IsTrue(value));
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 74a539b..0aadb73 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -290,19 +290,37 @@ public interface FeFsTable extends FeTable {
   ListMap<TNetworkAddress> getHostIndex();
 
   /**
-   * Check if 'col_name' names the leading sort by column by searching the 'sort.columns'
-   * table property.
+   * Check if 'col_name' appears in the list of sort-by columns by searching the
+   * 'sort.columns' table property and return the index in the list if so. Return
+   * -1 otherwise.
    */
-  default boolean isLeadingSortByColumn(String col_name) {
+  default int getSortByColumnIndex(String col_name) {
     // Get the names of all sort by columns (specified in the SORT BY clause in
     // CREATE TABLE DDL) from TBLPROPERTIES.
     Map<String, String> parameters = getMetaStoreTable().getParameters();
-    if (parameters == null) return false;
+    if (parameters == null) return -1;
     String sort_by_columns_string = parameters.get("sort.columns");
-    if (sort_by_columns_string == null) return false;
-    String[] sort_by_columns = sort_by_columns_string.split(",", -1);
-    if (sort_by_columns == null) return false;
-    return sort_by_columns.length > 0 && sort_by_columns[0].equals(col_name);
+    if (sort_by_columns_string == null) return -1;
+    String[] sort_by_columns = sort_by_columns_string.split(",");
+    if (sort_by_columns == null) return -1;
+    for (int i = 0; i < sort_by_columns.length; i++) {
+      if (sort_by_columns[i].equals(col_name)) return i;
+    }
+    return -1;
+  }
+
+  /**
+   * Check if 'col_name' names the leading sort-by column.
+   */
+  default boolean isLeadingSortByColumn(String col_name) {
+    return getSortByColumnIndex(col_name) == 0;
+  }
+
+  /**
+   * Check if 'col_name' appears in the list of sort-by columns.
+   */
+  default boolean isSortByColumn(String col_name) {
+    return getSortByColumnIndex(col_name) >= 0;
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 9ef52bb..00af40e 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -91,6 +91,7 @@ import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TScanRangeSpec;
+import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.BitUtil;
 import org.apache.impala.util.ExecutorMembershipSnapshot;
@@ -725,21 +726,34 @@ public class HdfsScanNode extends ScanNode {
   /**
    * Determine if a runtime filter should be allowed given the relevant query options.
    */
-  private boolean allowRuntimeFilter(TQueryOptions queryOptions,
-      boolean isBoundByPartitionColumns, boolean isLeadingLexicalSortedColumn) {
+  private boolean allowMinMaxFilter(FeTable table, Column column,
+      TQueryOptions queryOptions, boolean isBoundByPartitionColumns) {
+    if (column == null || table == null || !(table instanceof FeFsTable)) return false;
+    FeFsTable feFsTable = (FeFsTable) table;
+
     boolean minmaxOnPartitionColumns = queryOptions.isMinmax_filter_partition_columns();
     boolean minmaxOnSortedColumns = queryOptions.isMinmax_filter_sorted_columns();
 
+    TSortingOrder sortOrder = feFsTable.getSortOrderForSortByColumn();
+    if (sortOrder != null) {
+      // The table is sorted.
+      if (sortOrder == TSortingOrder.LEXICAL) {
+        // If the table is sorted in lexical order, allow it if the column is a
+        // leading sort-by column and filtering on sorted column is enabled.
+        return feFsTable.isLeadingSortByColumn(column.getName()) && minmaxOnSortedColumns;
+      } else {
+        // Must be Z-order. Allow it if it is one of the sort-by columns and filtering
+        // on sorted column is enabled.
+        Preconditions.checkState(sortOrder == TSortingOrder.ZORDER);
+        return feFsTable.isSortByColumn(column.getName()) && minmaxOnSortedColumns;
+      }
+    }
+
     // Allow min/max filters on partition columns only when enabled.
     if (isBoundByPartitionColumns) {
       return minmaxOnPartitionColumns;
     }
 
-    // Allow min/max filters on sorted columns only when enabled.
-    if (isLeadingLexicalSortedColumn) {
-      return minmaxOnSortedColumns;
-    }
-
     // Allow min/max filters if the threshold value > 0.0.
     return queryOptions.getMinmax_filter_threshold() > 0.0;
   }
@@ -766,19 +780,10 @@ public class HdfsScanNode extends ScanNode {
         return false;
     }
 
-    boolean isLeadingLexicalSortedColumn = false;
     Column column = slotRefInScan.getDesc().getColumn();
-    if (column != null) {
-      TupleDescriptor tDesc = slotRefInScan.getDesc().getParent();
-      FeTable table = tDesc.getTable();
-      if (table != null && table instanceof FeFsTable) {
-        isLeadingLexicalSortedColumn =
-            ((FeFsTable) table).isLeadingSortByColumn(column.getName())
-            && ((FeFsTable) table).IsLexicalSortByColumn();
-      }
-    }
-    if (!allowRuntimeFilter(analyzer.getQueryOptions(), isBoundByPartitionColumns,
-            isLeadingLexicalSortedColumn)) {
+    FeTable table = slotRefInScan.getDesc().getParent().getTable();
+    if (!allowMinMaxFilter(
+            table, column, analyzer.getQueryOptions(), isBoundByPartitionColumns)) {
       return false;
     }
 
diff --git a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters_on_sorted_columns.test b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters_on_sorted_columns.test
index ffb1a18..d30af02 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters_on_sorted_columns.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters_on_sorted_columns.test
@@ -252,3 +252,98 @@ where a.a = b.float_col;
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRuntimeFilteredPages): 0
 ====
+---- QUERY
+###################################################
+# Create a version of store_sales with z-order sort
+# on primary key and load it with 27587 rows.
+###################################################
+drop table if exists store_sales_zorder;
+create table store_sales_zorder (
+ss_sold_time_sk INT,
+ss_item_sk BIGINT,
+ss_customer_sk INT,
+ss_cdemo_sk INT,
+ss_hdemo_sk INT,
+ss_addr_sk INT,
+ss_store_sk INT,
+ss_promo_sk INT,
+ss_ticket_number BIGINT,
+ss_quantity INT,
+ss_wholesale_cost DECIMAL(7,2),
+ss_list_price DECIMAL(7,2),
+ss_sales_price DECIMAL(7,2),
+ss_ext_discount_amt DECIMAL(7,2),
+ss_ext_sales_price DECIMAL(7,2),
+ss_ext_wholesale_cost DECIMAL(7,2),
+ss_ext_list_price DECIMAL(7,2),
+ss_ext_tax DECIMAL(7,2),
+ss_coupon_amt DECIMAL(7,2),
+ss_net_paid DECIMAL(7,2),
+ss_net_paid_inc_tax DECIMAL(7,2),
+ss_net_profit DECIMAL(7,2),
+PRIMARY KEY (ss_item_sk, ss_ticket_number)
+)
+PARTITIONED BY (ss_sold_date_sk INT)
+sort by zorder(ss_item_sk, ss_ticket_number)
+STORED AS PARQUET;
+set PARQUET_PAGE_ROW_COUNT_LIMIT=1000;
+insert into store_sales_zorder partition (ss_sold_date_sk)
+select * from tpcds_parquet.store_sales;
+====
+---- QUERY
+###################################################
+# A minmax filter should be generated for the
+# z-order column ss_item_sk out of box.
+###################################################
+set explain_level=3;
+explain select straight_join a.ss_sold_time_sk from
+store_sales_zorder a join [SHUFFLE] tpcds_parquet.store_sales b
+on a.ss_item_sk = b.ss_item_sk
+where b.ss_customer_sk < 10 and b.ss_addr_sk < 20;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:.* RF001\[min_max\] -. .\.ss_item_sk.*
+====
+---- QUERY
+###################################################
+# A minmax filter should be generated for the
+# z-order column ss_ticket_number out of box.
+###################################################
+set explain_level=3;
+explain select straight_join a.ss_sold_time_sk from
+store_sales_zorder a join [SHUFFLE] tpcds_parquet.store_sales b
+on a.ss_ticket_number = b.ss_ticket_number
+where b.ss_customer_sk < 10 and b.ss_addr_sk < 20;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:.* RF001\[min_max\] -. .\.ss_ticket_number.*
+====
+---- QUERY
+###################################################
+# A minmax filter should not be generated for
+# column ss_item_sk which is not z-ordered.
+###################################################
+set explain_level=3;
+explain select straight_join a.ss_sold_time_sk from
+store_sales_zorder a join [SHUFFLE] tpcds_parquet.store_sales b
+on a.ss_cdemo_sk = b.ss_cdemo_sk
+where b.ss_customer_sk < 10 and b.ss_addr_sk < 20;
+---- RESULTS: VERIFY_IS_NOT_IN
+row_regex:.* RF001\[min_max\] -. .\.ss_cdemo_sk.*
+====
+---- QUERY
+###################################################
+# Run a query that demonstrates the min/max filter
+# helps reduces # of pages:
+# sum(NumRuntimeFilteredPages) = 28
+###################################################
+set minmax_filtering_level=page;
+set minmax_filter_threshold=0.9;
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select straight_join count(a.ss_sold_time_sk) from
+store_sales_zorder a join [SHUFFLE] tpcds_parquet.item b
+on a.ss_item_sk = b.i_item_sk
+where i_manufact_id = 1 and i_current_price < 1.0;
+---- RESULTS
+540
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRuntimeFilteredPages): 28
+====