You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2024/01/31 05:09:17 UTC

(impala) 01/02: IMPALA-12765: Balance consecutive partitions better for Iceberg tables

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 62a3168eca955119ad7c01b1f4d91a9702efd397
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Mon Jan 29 19:11:52 2024 +0100

    IMPALA-12765: Balance consecutive partitions better for Iceberg tables
    
    During remote read scheduling Impala does the following:
    
    Non-Iceberg tables
     * The scheduler processes the scan ranges in partition key order
     * The scheduler selects N executors as candidates
     * The scheduler chooses the executor from the candidates based on
       minimum number of assigned bytes
     * So consecutive partitions are more likely to be assigned to
       different executors
    
    Iceberg tables
     * The scheduler processes the scan ranges in random order
     * The scheduler selects N executors as candidates
     * The scheduler chooses the executor from the candidates based on
       minimum number of assigned bytes
     * So consecutive partitions (by partition key order) are assigned
       randomly, i.e. there's a higher chance of clustering
    
    With this patch, IcebergScanNode orders its file descriptors based on
    their paths, so we will have a more balanced scheduling for consecutive
    partitions. It is especially important for queries that prune partitions
    via runtime filters (e.g. due to a JOIN), because it doesn't matter that
    we schedule the scan ranges evenly, the scan ranges that survive the
    runtime filters can still be clustered on certain executors.
    
    E.g. TPC-DS Q22 has the following JOIN and WHERE predicates:
    
     inv_date_sk=d_date_sk and
     d_month_seq between 1199 and 1199 + 11
    
    The Inventory table is partitioned by column inv_date_sk, and we filter
    the rows in the joined table by 'd_month_seq between 1199 and
    1199 + 11'. This means that we will only need a range of partitions from
    the Inventory table, but that range will only be revealed during
    runtime. Scheduling neighbouring partitions to different executors means
    that the surviving partitions are spread across executors more evenly.
    
    Testing:
     * e2e test
    
    Change-Id: I60773965ecbb4d8e659db158f1f0ac76086d5578
    Reviewed-on: http://gerrit.cloudera.org:8080/20973
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/planner/IcebergScanNode.java | 16 ++++++-
 tests/query_test/test_iceberg.py                   | 50 +++++++++++++++++++++-
 2 files changed, 64 insertions(+), 2 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index 10b5a5390..a5abefea0 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -52,8 +52,14 @@ import com.google.common.collect.Lists;
 public class IcebergScanNode extends HdfsScanNode {
   private final static Logger LOG = LoggerFactory.getLogger(IcebergScanNode.class);
 
+  // List of files needed to be scanned by this scan node. The list is sorted in case of
+  // partitioned tables, so partition range scans are scheduled more evenly.
+  // See IMPALA-12765 for details.
   private List<FileDescriptor> fileDescs_;
 
+  // Indicates that the files in 'fileDescs_' are sorted.
+  private boolean filesAreSorted_ = false;
+
   // Conjuncts on columns not involved in IDENTITY-partitioning. Subset of 'conjuncts_',
   // but this does not include conjuncts on IDENTITY-partitioned columns, because such
   // conjuncts have already been pushed to Iceberg to filter out partitions/files, so
@@ -89,6 +95,12 @@ public class IcebergScanNode extends HdfsScanNode {
     Preconditions.checkState(partitions_.size() == 1);
 
     fileDescs_ = fileDescs;
+    if (((FeIcebergTable)tblRef.getTable()).isPartitioned()) {
+      // Let's order the file descriptors for better scheduling.
+      // See IMPALA-12765 for details.
+      Collections.sort(fileDescs_);
+      filesAreSorted_ = true;
+    }
     nonIdentityConjuncts_ = nonIdentityConjuncts;
     //TODO IMPALA-11577: optimize file format counting
     boolean hasParquet = false;
@@ -203,7 +215,9 @@ public class IcebergScanNode extends HdfsScanNode {
 
     // Ensure a consistent ordering of files for repeatable runs.
     List<FileDescriptor> orderedFds = Lists.newArrayList(fileDescs_);
-    Collections.sort(orderedFds);
+    if (!filesAreSorted_) {
+      Collections.sort(orderedFds);
+    }
 
     Preconditions.checkState(partitions_.size() == 1);
     FeFsPartition part = partitions_.get(0);
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 769850b17..6baea6255 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1022,7 +1022,7 @@ class TestIcebergTable(IcebergTestSuite):
     """IMPALA-10914: This test verifies that Impala schedules scan ranges consistently for
     Iceberg tables."""
     def collect_split_stats(profile):
-      splits = [l.strip() for l in profile.splitlines() if "Hdfs split stats" in l]
+      splits = [s.strip() for s in profile.splitlines() if "Hdfs split stats" in s]
       splits.sort()
       return splits
 
@@ -1041,6 +1041,54 @@ class TestIcebergTable(IcebergTestSuite):
         split_stats = collect_split_stats(profile)
         assert ref_split_stats == split_stats
 
+  def test_scheduling_partitioned_tables(self, vector, unique_database):
+    """IMPALA-12765: Balance consecutive partitions better for Iceberg tables"""
+    # We are setting the replica_preference query option in this test, so let's create a
+    # local impala client.
+    inventory_tbl = "inventory_ice"
+    item_tbl = "item_ice"
+    date_dim_tbl = "date_dim_ice"
+    with self.create_impala_client() as impalad_client:
+      impalad_client.execute("use " + unique_database)
+      impalad_client.execute("set replica_preference=remote")
+      impalad_client.execute("""
+          CREATE TABLE {}
+          PARTITIONED BY SPEC (inv_date_sk)
+          STORED BY ICEBERG
+          AS SELECT * from tpcds_partitioned_parquet_snap.inventory;
+          """.format(inventory_tbl))
+      impalad_client.execute("""
+          CREATE TABLE {}
+          STORED BY ICEBERG
+          AS SELECT * from tpcds_partitioned_parquet_snap.item;
+          """.format(item_tbl))
+      impalad_client.execute("""
+          CREATE TABLE {}
+          STORED BY ICEBERG
+          AS SELECT * from tpcds_partitioned_parquet_snap.date_dim;
+          """.format(date_dim_tbl))
+      q22_result = impalad_client.execute("""
+          select i_product_name, i_brand, i_class, i_category,
+                avg(inv_quantity_on_hand) qoh
+          from inventory_ice, date_dim_ice, item_ice
+          where inv_date_sk=d_date_sk and
+                inv_item_sk=i_item_sk and
+                d_month_seq between 1199 and 1199 + 11
+          group by rollup(i_product_name, i_brand, i_class, i_category)
+          order by qoh, i_product_name, i_brand, i_class, i_category
+          limit 100
+          """)
+      profile = q22_result.runtime_profile
+      # "Files rejected:" contains the number of files being rejected by runtime
+      # filters. With IMPALA-12765 we should see similar numbers for each executor.
+      files_rejected_array = re.findall(r"Files rejected: \d+ \((\d+)\)", profile)
+      avg_files_rejected = int(files_rejected_array[0])
+      THRESHOLD = 3
+      for files_rejected_str in files_rejected_array:
+        files_rejected = int(files_rejected_str)
+        if files_rejected != 0:
+          assert abs(avg_files_rejected - files_rejected) < THRESHOLD
+
   def test_in_predicate_push_down(self, vector, unique_database):
     self.execute_query("SET RUNTIME_FILTER_MODE=OFF")
     self.run_test_case('QueryTest/iceberg-in-predicate-push-down', vector,