You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2021/07/16 01:30:54 UTC
[impala] 01/02: IMPALA-10763: Min/max filters should be enabled on
Z-order sorted columns
This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 36d8e6766e3f1ce133a86b3edc5fee34e5e71cc7
Author: Qifan Chen <qc...@cloudera.com>
AuthorDate: Thu Jun 24 20:18:50 2021 -0400
IMPALA-10763: Min/max filters should be enabled on Z-order sorted columns
This patch enables min/max filtering on any Z-order sort-by columns
by default.
Since the column stats for a row group or a page is computed from the
column values stored in the row group or the page, the current
infrastructure for min/max filtering works for the Z-order out of box.
The fact that these column values are ordered by Z-order is
orthogonal to the work of min/max filtering.
By default, the new feature is enabled. Set the existing control knob
minmax_filter_sorted_columns to false to turn it off.
Testing
1. Added new z-order related sort column tests in
overlap_min_max_filters_on_sorted_columns.test;
2. Ran core-test.
Change-Id: I2a528ffbd0e333721ef38b4be7d4ddcdbf188adf
Reviewed-on: http://gerrit.cloudera.org:8080/17635
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/service/query-options.cc | 2 +-
.../java/org/apache/impala/catalog/FeFsTable.java | 34 ++++++--
.../org/apache/impala/planner/HdfsScanNode.java | 43 +++++-----
.../overlap_min_max_filters_on_sorted_columns.test | 95 ++++++++++++++++++++++
4 files changed, 146 insertions(+), 28 deletions(-)
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index e92e81f..4e431a2 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1097,7 +1097,7 @@ Status impala::SetQueryOption(const string& key, const string& value,
}
case TImpalaQueryOptions::DELETE_STATS_IN_TRUNCATE: {
query_options->__set_delete_stats_in_truncate(IsTrue(value));
- break;
+ break;
}
case TImpalaQueryOptions::MINMAX_FILTER_SORTED_COLUMNS: {
query_options->__set_minmax_filter_sorted_columns(IsTrue(value));
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 74a539b..0aadb73 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -290,19 +290,37 @@ public interface FeFsTable extends FeTable {
ListMap<TNetworkAddress> getHostIndex();
/**
- * Check if 'col_name' names the leading sort by column by searching the 'sort.columns'
- * table property.
+ * Check if 'col_name' appears in the list of sort-by columns by searching the
+ * 'sort.columns' table property and return the index in the list if so. Return
+ * -1 otherwise.
*/
- default boolean isLeadingSortByColumn(String col_name) {
+ default int getSortByColumnIndex(String col_name) {
// Get the names of all sort by columns (specified in the SORT BY clause in
// CREATE TABLE DDL) from TBLPROPERTIES.
Map<String, String> parameters = getMetaStoreTable().getParameters();
- if (parameters == null) return false;
+ if (parameters == null) return -1;
String sort_by_columns_string = parameters.get("sort.columns");
- if (sort_by_columns_string == null) return false;
- String[] sort_by_columns = sort_by_columns_string.split(",", -1);
- if (sort_by_columns == null) return false;
- return sort_by_columns.length > 0 && sort_by_columns[0].equals(col_name);
+ if (sort_by_columns_string == null) return -1;
+ String[] sort_by_columns = sort_by_columns_string.split(",");
+ if (sort_by_columns == null) return -1;
+ for (int i = 0; i < sort_by_columns.length; i++) {
+ if (sort_by_columns[i].equals(col_name)) return i;
+ }
+ return -1;
+ }
+
+ /**
+ * Check if 'col_name' names the leading sort-by column.
+ */
+ default boolean isLeadingSortByColumn(String col_name) {
+ return getSortByColumnIndex(col_name) == 0;
+ }
+
+ /**
+ * Check if 'col_name' appears in the list of sort-by columns.
+ */
+ default boolean isSortByColumn(String col_name) {
+ return getSortByColumnIndex(col_name) >= 0;
}
/**
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 9ef52bb..00af40e 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -91,6 +91,7 @@ import org.apache.impala.thrift.TScanRange;
import org.apache.impala.thrift.TScanRangeLocation;
import org.apache.impala.thrift.TScanRangeLocationList;
import org.apache.impala.thrift.TScanRangeSpec;
+import org.apache.impala.thrift.TSortingOrder;
import org.apache.impala.thrift.TTableStats;
import org.apache.impala.util.BitUtil;
import org.apache.impala.util.ExecutorMembershipSnapshot;
@@ -725,21 +726,34 @@ public class HdfsScanNode extends ScanNode {
/**
* Determine if a runtime filter should be allowed given the relevant query options.
*/
- private boolean allowRuntimeFilter(TQueryOptions queryOptions,
- boolean isBoundByPartitionColumns, boolean isLeadingLexicalSortedColumn) {
+ private boolean allowMinMaxFilter(FeTable table, Column column,
+ TQueryOptions queryOptions, boolean isBoundByPartitionColumns) {
+ if (column == null || table == null || !(table instanceof FeFsTable)) return false;
+ FeFsTable feFsTable = (FeFsTable) table;
+
boolean minmaxOnPartitionColumns = queryOptions.isMinmax_filter_partition_columns();
boolean minmaxOnSortedColumns = queryOptions.isMinmax_filter_sorted_columns();
+ TSortingOrder sortOrder = feFsTable.getSortOrderForSortByColumn();
+ if (sortOrder != null) {
+ // The table is sorted.
+ if (sortOrder == TSortingOrder.LEXICAL) {
+ // If the table is sorted in lexical order, allow it if the column is a
+ // leading sort-by column and filtering on sorted column is enabled.
+ return feFsTable.isLeadingSortByColumn(column.getName()) && minmaxOnSortedColumns;
+ } else {
+ // Must be Z-order. Allow it if it is one of the sort-by columns and filtering
+ // on sorted column is enabled.
+ Preconditions.checkState(sortOrder == TSortingOrder.ZORDER);
+ return feFsTable.isSortByColumn(column.getName()) && minmaxOnSortedColumns;
+ }
+ }
+
// Allow min/max filters on partition columns only when enabled.
if (isBoundByPartitionColumns) {
return minmaxOnPartitionColumns;
}
- // Allow min/max filters on sorted columns only when enabled.
- if (isLeadingLexicalSortedColumn) {
- return minmaxOnSortedColumns;
- }
-
// Allow min/max filters if the threshold value > 0.0.
return queryOptions.getMinmax_filter_threshold() > 0.0;
}
@@ -766,19 +780,10 @@ public class HdfsScanNode extends ScanNode {
return false;
}
- boolean isLeadingLexicalSortedColumn = false;
Column column = slotRefInScan.getDesc().getColumn();
- if (column != null) {
- TupleDescriptor tDesc = slotRefInScan.getDesc().getParent();
- FeTable table = tDesc.getTable();
- if (table != null && table instanceof FeFsTable) {
- isLeadingLexicalSortedColumn =
- ((FeFsTable) table).isLeadingSortByColumn(column.getName())
- && ((FeFsTable) table).IsLexicalSortByColumn();
- }
- }
- if (!allowRuntimeFilter(analyzer.getQueryOptions(), isBoundByPartitionColumns,
- isLeadingLexicalSortedColumn)) {
+ FeTable table = slotRefInScan.getDesc().getParent().getTable();
+ if (!allowMinMaxFilter(
+ table, column, analyzer.getQueryOptions(), isBoundByPartitionColumns)) {
return false;
}
diff --git a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters_on_sorted_columns.test b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters_on_sorted_columns.test
index ffb1a18..d30af02 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters_on_sorted_columns.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters_on_sorted_columns.test
@@ -252,3 +252,98 @@ where a.a = b.float_col;
---- RUNTIME_PROFILE
aggregation(SUM, NumRuntimeFilteredPages): 0
====
+---- QUERY
+###################################################
+# Create a version of store_sales with z-order sort
+# on primary key and load it with 27587 rows.
+###################################################
+drop table if exists store_sales_zorder;
+create table store_sales_zorder (
+ss_sold_time_sk INT,
+ss_item_sk BIGINT,
+ss_customer_sk INT,
+ss_cdemo_sk INT,
+ss_hdemo_sk INT,
+ss_addr_sk INT,
+ss_store_sk INT,
+ss_promo_sk INT,
+ss_ticket_number BIGINT,
+ss_quantity INT,
+ss_wholesale_cost DECIMAL(7,2),
+ss_list_price DECIMAL(7,2),
+ss_sales_price DECIMAL(7,2),
+ss_ext_discount_amt DECIMAL(7,2),
+ss_ext_sales_price DECIMAL(7,2),
+ss_ext_wholesale_cost DECIMAL(7,2),
+ss_ext_list_price DECIMAL(7,2),
+ss_ext_tax DECIMAL(7,2),
+ss_coupon_amt DECIMAL(7,2),
+ss_net_paid DECIMAL(7,2),
+ss_net_paid_inc_tax DECIMAL(7,2),
+ss_net_profit DECIMAL(7,2),
+PRIMARY KEY (ss_item_sk, ss_ticket_number)
+)
+PARTITIONED BY (ss_sold_date_sk INT)
+sort by zorder(ss_item_sk, ss_ticket_number)
+STORED AS PARQUET;
+set PARQUET_PAGE_ROW_COUNT_LIMIT=1000;
+insert into store_sales_zorder partition (ss_sold_date_sk)
+select * from tpcds_parquet.store_sales;
+====
+---- QUERY
+###################################################
+# A minmax filter should be generated for the
+# z-order column ss_item_sk out of box.
+###################################################
+set explain_level=3;
+explain select straight_join a.ss_sold_time_sk from
+store_sales_zorder a join [SHUFFLE] tpcds_parquet.store_sales b
+on a.ss_item_sk = b.ss_item_sk
+where b.ss_customer_sk < 10 and b.ss_addr_sk < 20;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:.* RF001\[min_max\] -. .\.ss_item_sk.*
+====
+---- QUERY
+###################################################
+# A minmax filter should be generated for the
+# z-order column ss_ticket_number out of box.
+###################################################
+set explain_level=3;
+explain select straight_join a.ss_sold_time_sk from
+store_sales_zorder a join [SHUFFLE] tpcds_parquet.store_sales b
+on a.ss_ticket_number = b.ss_ticket_number
+where b.ss_customer_sk < 10 and b.ss_addr_sk < 20;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:.* RF001\[min_max\] -. .\.ss_ticket_number.*
+====
+---- QUERY
+###################################################
+# A minmax filter should not be generated for
+# column ss_item_sk which is not z-ordered.
+###################################################
+set explain_level=3;
+explain select straight_join a.ss_sold_time_sk from
+store_sales_zorder a join [SHUFFLE] tpcds_parquet.store_sales b
+on a.ss_cdemo_sk = b.ss_cdemo_sk
+where b.ss_customer_sk < 10 and b.ss_addr_sk < 20;
+---- RESULTS: VERIFY_IS_NOT_IN
+row_regex:.* RF001\[min_max\] -. .\.ss_cdemo_sk.*
+====
+---- QUERY
+###################################################
+# Run a query that demonstrates the min/max filter
+# helps reduces # of pages:
+# sum(NumRuntimeFilteredPages) = 28
+###################################################
+set minmax_filtering_level=page;
+set minmax_filter_threshold=0.9;
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select straight_join count(a.ss_sold_time_sk) from
+store_sales_zorder a join [SHUFFLE] tpcds_parquet.item b
+on a.ss_item_sk = b.i_item_sk
+where i_manufact_id = 1 and i_current_price < 1.0;
+---- RESULTS
+540
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRuntimeFilteredPages): 28
+====