You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/09/05 01:59:52 UTC

[impala] branch master updated (b718d6386 -> b73bc49ea)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from b718d6386 IMPALA-11535: Skip older events in the event processor based on the latestRefreshEventID
     new c49f5d277 IMPALA-12408: Optimize HdfsScanNode.computeScanRangeLocations()
     new a9aaeaa48 IMPALA-12409: Don't allow EXTERNAL Iceberg tables to point another Iceberg table in Hive catalog
     new b73bc49ea IMPALA-12400: Test expected executors used for planning when no executor groups are healthy

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../analysis/AlterTableSetTblProperties.java       |   4 +
 .../apache/impala/analysis/ComputeStatsStmt.java   |   5 +-
 .../apache/impala/analysis/CreateTableStmt.java    |  17 +--
 .../java/org/apache/impala/catalog/FeFsTable.java  |   9 +-
 .../org/apache/impala/catalog/HdfsPartition.java   |   7 +-
 .../org/apache/impala/catalog/IcebergTable.java    |  29 +++++
 .../impala/catalog/local/LocalFsPartition.java     |   7 +-
 .../org/apache/impala/planner/HdfsScanNode.java    | 133 ++++++++++-----------
 .../org/apache/impala/planner/IcebergScanNode.java |   8 +-
 .../queries/QueryTest/iceberg-catalogs.test        |  38 ------
 .../queries/QueryTest/iceberg-insert.test          |  72 -----------
 .../queries/QueryTest/iceberg-negative.test        |  60 +++++++++-
 tests/custom_cluster/test_executor_groups.py       |  69 +++++++++++
 tests/query_test/test_iceberg.py                   |  27 +++++
 14 files changed, 275 insertions(+), 210 deletions(-)


[impala] 02/03: IMPALA-12409: Don't allow EXTERNAL Iceberg tables to point another Iceberg table in Hive catalog

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a9aaeaa4896ce025bd9fff2fd60f230a1c1e3733
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Mon Aug 28 19:38:03 2023 +0200

    IMPALA-12409: Don't allow EXTERNAL Iceberg tables to point another Iceberg table in Hive catalog
    
    This patch forbids creating an EXTERNAL Iceberg table that points
    to another Iceberg table in the Hive Catalog. I.e. the following should
    be forbidden:
    
      CREATE EXTERNAL TABLE ice_ext
      STORED BY ICEBERG
      TBLPROPERTIES ('iceberg.table_identifier'='db.tbl');
    
    Loading such tables should also raise an error. Users need to query
    the original Iceberg tables. Alternatively they can create VIEWs if
    they want to query tables with a different name.
    
    Testing:
     * added e2e tests for CREATE EXTERNAL TABLE
     * added e2e test about loading such table
    
    Change-Id: Ifb0d7f0e7ec40fba356bd58b43f68d070432de71
    Reviewed-on: http://gerrit.cloudera.org:8080/20429
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../analysis/AlterTableSetTblProperties.java       |  4 ++
 .../apache/impala/analysis/CreateTableStmt.java    | 17 ++---
 .../org/apache/impala/catalog/IcebergTable.java    | 29 +++++++++
 .../queries/QueryTest/iceberg-catalogs.test        | 38 ------------
 .../queries/QueryTest/iceberg-insert.test          | 72 ----------------------
 .../queries/QueryTest/iceberg-negative.test        | 60 +++++++++++++++++-
 tests/query_test/test_iceberg.py                   | 27 ++++++++
 7 files changed, 123 insertions(+), 124 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
index 85ed35108..46d6d271e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
@@ -25,6 +25,8 @@ import org.apache.avro.SchemaParseException;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeHBaseTable;
@@ -155,6 +157,8 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt {
     icebergPropertyCheck(IcebergTable.ICEBERG_CATALOG);
     icebergPropertyCheck(IcebergTable.ICEBERG_CATALOG_LOCATION);
     icebergPropertyCheck(IcebergTable.ICEBERG_TABLE_IDENTIFIER);
+    icebergPropertyCheck(Catalogs.NAME);
+    icebergPropertyCheck(InputFormatConfig.TABLE_IDENTIFIER);
     icebergPropertyCheck(IcebergTable.METADATA_LOCATION);
     if (tblProperties_.containsKey(IcebergTable.ICEBERG_FILE_FORMAT)) {
       icebergTableFormatCheck(tblProperties_.get(IcebergTable.ICEBERG_FILE_FORMAT));
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index bc378c5d5..8f8422555 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -775,6 +775,10 @@ public class CreateTableStmt extends StatementBase {
       analyzer_.addWarning("The table property 'external.table.purge' will be set "
           + "to 'TRUE' on newly created managed Iceberg tables.");
     }
+    if (isExternalWithNoPurge() && IcebergUtil.isHiveCatalog(getTblProperties())) {
+        throw new AnalysisException("Cannot create EXTERNAL Iceberg table in the " +
+            "Hive Catalog.");
+    }
   }
 
   private void validateTableInHiveCatalog() throws AnalysisException {
@@ -782,19 +786,6 @@ public class CreateTableStmt extends StatementBase {
       throw new AnalysisException(String.format("%s cannot be set for Iceberg table " +
           "stored in hive.catalog", IcebergTable.ICEBERG_CATALOG_LOCATION));
     }
-    if (isExternalWithNoPurge()) {
-      String tableId = getTblProperties().get(IcebergTable.ICEBERG_TABLE_IDENTIFIER);
-      if (tableId == null || tableId.isEmpty()) {
-        tableId = getTblProperties().get(Catalogs.NAME);
-      }
-      if (tableId == null || tableId.isEmpty()) {
-        throw new AnalysisException(String.format("Table property '%s' is necessary " +
-            "for external Iceberg tables stored in hive.catalog. " +
-            "For creating a completely new Iceberg table, use 'CREATE TABLE' " +
-            "(no EXTERNAL keyword).",
-            IcebergTable.ICEBERG_TABLE_IDENTIFIER));
-      }
-    }
   }
 
   private void validateTableInHadoopCatalog() throws AnalysisException {
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 4c83acf02..2a9f7ade8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -29,6 +29,10 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.impala.analysis.IcebergPartitionField;
 import org.apache.impala.analysis.IcebergPartitionSpec;
 import org.apache.impala.analysis.IcebergPartitionTransform;
@@ -339,6 +343,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
       throws TableLoadingException {
     final Timer.Context context =
         getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
+    verifyTable(msTbl);
     try {
       // Copy the table to check later if anything has changed.
       msTable_ = msTbl.deepCopy();
@@ -394,6 +399,30 @@ public class IcebergTable extends Table implements FeIcebergTable {
     }
   }
 
+  /**
+   * @throws TableLoadingException when it is unsafe to load the table.
+   */
+  private void verifyTable(org.apache.hadoop.hive.metastore.api.Table msTbl)
+      throws TableLoadingException {
+    if (IcebergUtil.isHiveCatalog(msTbl.getParameters())) {
+      String tableId = IcebergUtil.getIcebergTableIdentifier(
+          msTbl.getDbName(), msTbl.getTableName()).toString();
+      Map<String, String> params = msTbl.getParameters();
+      if (!tableId.equalsIgnoreCase(
+              params.getOrDefault(IcebergTable.ICEBERG_TABLE_IDENTIFIER, tableId)) ||
+          !tableId.equalsIgnoreCase(
+              params.getOrDefault(Catalogs.NAME, tableId)) ||
+          !tableId.equalsIgnoreCase(
+              params.getOrDefault(InputFormatConfig.TABLE_IDENTIFIER, tableId))) {
+        throw new TableLoadingException(String.format(
+            "Table %s cannot be loaded because it is an " +
+            "EXTERNAL table in the HiveCatalog that points to another table. " +
+            "Query the original table instead.",
+            getFullName()));
+      }
+    }
+  }
+
   /**
    * Load schema and partitioning schemes directly from Iceberg.
    */
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test
index c9430a306..843c12754 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test
@@ -122,41 +122,3 @@ row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/iceberg_hive_catalogs/data/labe
 ---- TYPES
 STRING, STRING, STRING, STRING
 ====
----- QUERY
-CREATE EXTERNAL TABLE iceberg_hive_catalogs_ext
-STORED AS ICEBERG
-TBLPROPERTIES('iceberg.catalog'='ice_hive_cat',
-'iceberg.table_identifier'='$DATABASE.iceberg_hive_catalogs');
----- RESULTS
-'Table has been created.'
-====
----- QUERY
-DESCRIBE FORMATTED iceberg_hive_catalogs_ext;
----- RESULTS: VERIFY_IS_SUBSET
-'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/iceberg_hive_catalogs','NULL'
-'','write.format.default','parquet             '
-'','iceberg.table_identifier','$DATABASE.iceberg_hive_catalogs'
-'','name                ','$DATABASE.iceberg_hive_catalogs'
----- TYPES
-string, string, string
-====
----- QUERY
-SELECT * FROM iceberg_hive_catalogs_ext;
----- RESULTS
-'ice',3.14
----- TYPES
-STRING,DECIMAL
-====
----- QUERY
-DROP TABLE iceberg_hive_catalogs_ext;
----- RESULTS
-'Table has been dropped.'
-====
----- QUERY
-REFRESH iceberg_hive_catalogs;
-SELECT * FROM iceberg_hive_catalogs;
----- RESULTS
-'ice',3.14
----- TYPES
-STRING,DECIMAL
-====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-insert.test
index 5d72d2cdb..0773f0b63 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-insert.test
@@ -167,84 +167,12 @@ select * from iceberg_hive_cat;
 INT
 ====
 ---- QUERY
-# Query external Iceberg table
-create external table iceberg_hive_cat_ext (i int)
-stored as iceberg
-location '$WAREHOUSE_LOCATION_PREFIX/test-warehouse/$DATABASE.db/iceberg_hive_cat'
-tblproperties('iceberg.catalog'='hive.catalog',
-    'iceberg.table_identifier'='$DATABASE.iceberg_hive_cat');
----- RESULTS
-'Table has been created.'
-====
----- QUERY
-select * from iceberg_hive_cat_ext;
----- RESULTS
-7
----- TYPES
-INT
-====
----- QUERY
-# INSET INTO external Iceberg table stored in HiveCatalog.
-insert into iceberg_hive_cat_ext values (8);
-select * from iceberg_hive_cat_ext;
----- RESULTS
-7
-8
----- TYPES
-INT
-====
----- QUERY
-# Query original table
-refresh iceberg_hive_cat;
-select * from iceberg_hive_cat;
----- RESULTS
-7
-8
----- TYPES
-INT
-====
----- QUERY
-# DROP external Iceberg table
-drop table iceberg_hive_cat_ext
----- RESULTS
-'Table has been dropped.'
-====
----- QUERY
-# Original table is not affected after external table drop.
-refresh iceberg_hive_cat;
-select * from iceberg_hive_cat;
----- RESULTS
-7
-8
----- TYPES
-INT
-====
----- QUERY
-# Create another external Iceberg table
-create external table iceberg_hive_cat_ext_2 (i int)
-stored as iceberg
-location '$WAREHOUSE_LOCATION_PREFIX/test-warehouse/$DATABASE.db/iceberg_hive_cat'
-tblproperties('iceberg.catalog'='hive.catalog',
-    'iceberg.table_identifier'='$DATABASE.iceberg_hive_cat');
-select * from iceberg_hive_cat_ext_2
----- RESULTS
-7
-8
-====
----- QUERY
 # DROP the synchronized Iceberg table (data is purged).
 drop table iceberg_hive_cat
 ---- RESULTS
 'Table has been dropped.'
 ====
 ---- QUERY
-# The data has been purged, so querying the external table fails.
-refresh iceberg_hive_cat_ext_2;
-select * from iceberg_hive_cat_ext_2
----- CATCH
-Table does not exist
-====
----- QUERY
 # Insert into hive catalog with custom location.
 create table iceberg_hive_cat_custom_loc (i int)
 stored as iceberg
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index a3b55ee67..99ac5d090 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -93,7 +93,55 @@ CREATE EXTERNAL TABLE iceberg_hive_tbls_external(
 )
 STORED AS ICEBERG;
 ---- CATCH
-Table property 'iceberg.table_identifier' is necessary for external Iceberg tables stored in hive.catalog. For creating a completely new Iceberg table, use 'CREATE TABLE' (no EXTERNAL keyword).
+Cannot create EXTERNAL Iceberg table in the Hive Catalog.
+====
+---- QUERY
+CREATE EXTERNAL TABLE iceberg_hive_tbls_external(
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES ('iceberg.table_identifier'='functional_parquet.iceberg_partitioned');
+---- CATCH
+Cannot create EXTERNAL Iceberg table in the Hive Catalog.
+====
+---- QUERY
+CREATE EXTERNAL TABLE iceberg_hive_tbls_external(
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES ('iceberg.catalog'='hive.catalog',
+'iceberg.table_identifier'='functional_parquet.iceberg_partitioned');
+---- CATCH
+Cannot create EXTERNAL Iceberg table in the Hive Catalog.
+====
+---- QUERY
+CREATE TABLE iceberg_hive_alias(
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES ('iceberg.table_identifier'='functional_parquet.iceberg_mixed_file_format');
+---- CATCH
+AlreadyExistsException: Table already exists: functional_parquet.iceberg_mixed_file_format
+====
+---- QUERY
+CREATE EXTERNAL TABLE iceberg_hive_tbls_external(
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES ('iceberg.catalog'='hive.catalog',
+'iceberg.table_identifier'='functional_parquet.iceberg_partitioned');
+---- CATCH
+Cannot create EXTERNAL Iceberg table in the Hive Catalog.
+====
+---- QUERY
+CREATE EXTERNAL TABLE iceberg_hive_tbls_external(
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES ('iceberg.catalog'='ice_hive_catalog',
+'iceberg.table_identifier'='functional_parquet.iceberg_partitioned');
+---- CATCH
+Cannot create EXTERNAL Iceberg table in the Hive Catalog.
 ====
 ---- QUERY
 CREATE EXTERNAL TABLE fake_iceberg_table_hadoop_catalog
@@ -249,6 +297,16 @@ ALTER TABLE iceberg_table_hadoop_catalog set TBLPROPERTIES('iceberg.table_identi
 AnalysisException: Changing the 'iceberg.table_identifier' table property is not supported for Iceberg table.
 ====
 ---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog set TBLPROPERTIES('name'='fake_db.fake_table');
+---- CATCH
+AnalysisException: Changing the 'name' table property is not supported for Iceberg table.
+====
+---- QUERY
+ALTER TABLE iceberg_table_hadoop_catalog set TBLPROPERTIES('iceberg.mr.table.identifier'='fake_db.fake_table');
+---- CATCH
+AnalysisException: Changing the 'iceberg.mr.table.identifier' table property is not supported for Iceberg table.
+====
+---- QUERY
 ALTER TABLE iceberg_table_hadoop_catalog unset TBLPROPERTIES('iceberg.table_identifier');
 ---- CATCH
 AnalysisException: Unsetting the 'iceberg.table_identifier' table property is not supported for Iceberg table.
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 43f7974ba..02cf985d9 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1086,6 +1086,33 @@ class TestIcebergTable(IcebergTestSuite):
 
     assert parquet_column_name_type_list == iceberg_column_name_type_list
 
+  @SkipIfFS.hive
+  def test_hive_external_forbidden(self, vector, unique_database):
+    tbl_name = unique_database + ".hive_ext"
+    error_msg = ("cannot be loaded because it is an EXTERNAL table in the HiveCatalog "
+        "that points to another table. Query the original table instead.")
+    self.execute_query("create table {0} (i int) stored by iceberg".
+        format(tbl_name))
+    # 'iceberg.table_identifier' can refer to another table
+    self.run_stmt_in_hive("""alter table {0} set tblproperties
+        ('external.table.purge'='false',
+         'iceberg.table_identifier'='functional_iceberg.iceberg_partitioned')""".
+         format(tbl_name))
+    ex = self.execute_query_expect_failure(self.client, "refresh {0}".format(tbl_name))
+    assert error_msg in str(ex)
+    # 'iceberg.mr.table.identifier' can refer to another table
+    self.run_stmt_in_hive("""
+        alter table {0} unset tblproperties('iceberg.table_identifier')""".
+        format(tbl_name))
+    self.run_stmt_in_hive("""alter table {0} set tblproperties
+        ('iceberg.mr.table.identifier'='functional_iceberg.iceberg_partitioned')""".
+        format(tbl_name))
+    ex = self.execute_query_expect_failure(self.client, "refresh {0}".format(tbl_name))
+    assert error_msg in str(ex)
+    # 'name' can also refer to another table but cannot be set by Hive/Impala. Also,
+    # during table migration both Impala and Hive clears existing table properties
+    # See IMPALA-12410
+
   @SkipIfFS.incorrent_reported_ec
   def test_compute_stats(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-compute-stats', vector, unique_database)


[impala] 03/03: IMPALA-12400: Test expected executors used for planning when no executor groups are healthy

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b73bc49ea73f74afb815fe8826ec8feddf98d241
Author: Abhishek Rawat <ar...@cloudera.com>
AuthorDate: Thu Aug 24 13:21:26 2023 -0700

    IMPALA-12400: Test expected executors used for planning when no executor groups are healthy
    
    Added a custom cluster test for testing number of executors used for
    planning when no executor groups are healthy. Planner should use
    num executors from 'num_expected_executors' or
    'expected_executor_group_sets' when executor groups aren't healthy.
    
    Change-Id: Ib71ca0a5402c74d07ee875878f092d6d3827c6b7
    Reviewed-on: http://gerrit.cloudera.org:8080/20419
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/custom_cluster/test_executor_groups.py | 69 ++++++++++++++++++++++++++++
 1 file changed, 69 insertions(+)

diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 7d250bda9..af94e68f0 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -1473,3 +1473,72 @@ class TestExecutorGroups(CustomClusterTestSuite):
         "scheduling, F00 scheduled instance count (16) is higher than its effective "
         "count (12)") in profile, profile
     self.client.close_query(handle)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, cluster_size=1,
+      impalad_args="--num_expected_executors=7")
+  def test_expected_executors_no_healthy_groups(self):
+    """Tests expected executor group size used for planning when no executor groups
+    are healthy. Query planning should use expected executor group size from
+    'num_expected_executors' in such cases. If 'expected_executor_group_sets' is
+    set then executor group size is obtained from that config.
+    """
+    # Create fresh client
+    self.create_impala_clients()
+
+    # Compute stats on tpcds_parquet.store_sales
+    self.execute_query_expect_success(self.client,
+        "compute stats tpcds_parquet.store_sales", query_options={'NUM_NODES': '1'})
+
+    # Query planning should use expected number hosts from 'num_expected_executors'.
+    handle = self.execute_query_async(CPU_TEST_QUERY, {
+      "COMPUTE_PROCESSING_COST": "true"})
+    sleep(1)
+    profile = self.client.get_runtime_profile(handle)
+    assert "F00:PLAN FRAGMENT [RANDOM] hosts=7" in profile, profile
+    # The query should run on default resource pool.
+    assert "Request Pool: default-pool" in profile, profile
+
+    coordinator_test_args = ''
+    # The path to resources directory which contains the admission control config files.
+    RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test",
+                                 "resources")
+
+    # Reuse cluster configuration from _setup_three_exec_group_cluster, but only start
+    # root.large executor groups.
+    fs_allocation_path = os.path.join(RESOURCES_DIR, "fair-scheduler-3-groups.xml")
+    llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-3-groups.xml")
+
+    # extra args template to start coordinator
+    extra_args_template = ("-vmodule admission-controller=3 "
+        "-expected_executor_group_sets=root.small:2,root.large:12 "
+        "-fair_scheduler_allocation_path %s "
+        "-llama_site_path %s "
+        "%s ")
+
+    # Start with a regular admission config, multiple pools, no resource limits.
+    self._restart_coordinators(num_coordinators=1,
+        extra_args=extra_args_template % (fs_allocation_path, llama_site_path,
+          coordinator_test_args))
+
+    self.create_impala_clients()
+
+    # Small query should get planned using the small executor group set's resources.
+    handle = self.execute_query_async(CPU_TEST_QUERY, {
+      "COMPUTE_PROCESSING_COST": "true"})
+    sleep(1)
+    profile = self.client.get_runtime_profile(handle)
+    assert "F00:PLAN FRAGMENT [RANDOM] hosts=2" in profile, profile
+    # The query should run on root.small resource pool.
+    assert "Request Pool: root.small" in profile, profile
+
+    # Large query should get planned using the large executor group set's resources.
+    handle = self.execute_query_async(GROUPING_TEST_QUERY, {
+      "COMPUTE_PROCESSING_COST": "true"})
+    sleep(1)
+    profile = self.client.get_runtime_profile(handle)
+    assert "F00:PLAN FRAGMENT [RANDOM] hosts=12" in profile, profile
+    # The query should run on root.large resource pool.
+    assert "Request Pool: root.large" in profile, profile
+
+    self.client.close_query(handle)


[impala] 01/03: IMPALA-12408: Optimize HdfsScanNode.computeScanRangeLocations()

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c49f5d2778d10e988ab4d926e3326de043c20fe1
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Tue Aug 29 16:37:42 2023 +0200

    IMPALA-12408: Optimize HdfsScanNode.computeScanRangeLocations()
    
    computeScanRangeLocations() could be very slow for tables
    with large number of partitions. This patch tries to minimize
    the use of two expensive function calls:
    1. HdfsPartition.getLocation()
      - This looks like a simple property but actually decompresses
        the location string.
      - Was often called indirectly through getFsType().
      - After the patch it is only called once per partition.
    2. hadoop.fs.FileSystem.getFileSystem()
      - Hadoop caches the FileSystem object but the key contains
        UserGroupInformation which is obtained with
        UserGroupInformation.getCurrentUser(), making the call costly.
      - As the user is always the same during Impala planning we can cache
        it simply by scheme + authority part of the location URI. After
        the patch getFileSystem() is called if scheme/authority is
        different than in the previous partition, leading to a single call
        for most tables.
    
    Note that caching these values in HdfsPartition could also help
    but preferred to avoid increasing the size of that class.
    
    The patch also changes the implementation of how we count the number
    of partitions per file system (to avoid the extra calls to
    getFsType()). This made class SampledPartitionMetadata unnecessary and
    reverted some of the changes in https://gerrit.cloudera.org/#/c/12282/
    
    Benchmarks:
    Measured using tpcds.store_sales (1824 partitions)
    union all'd 256 times:
    explain select * from tpcds_parquet.store_sales256;
    Before patch: 8.8s
    After patch: 1.1s
    
    The improvement is also visible on full tpcds benchmark:
    +----------+-----------------------+---------+------------+------------+----------------+
    | Workload | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
    +----------+-----------------------+---------+------------+------------+----------------+
    | TPCDS(2) | parquet / none / none | 0.53    | -8.99%     | 0.29       | -10.78%        |
    +----------+-----------------------+---------+------------+------------+----------------+
    The effect is less significant on higher scale factors.
    
    Testing:
    - ran core tests
    
    Change-Id: Icf3e9c169d65c15df6a6762cc68fbb477fe64a7c
    Reviewed-on: http://gerrit.cloudera.org:8080/20434
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/analysis/ComputeStatsStmt.java   |   5 +-
 .../java/org/apache/impala/catalog/FeFsTable.java  |   9 +-
 .../org/apache/impala/catalog/HdfsPartition.java   |   7 +-
 .../impala/catalog/local/LocalFsPartition.java     |   7 +-
 .../org/apache/impala/planner/HdfsScanNode.java    | 133 ++++++++++-----------
 .../org/apache/impala/planner/IcebergScanNode.java |   8 +-
 6 files changed, 83 insertions(+), 86 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index fe2ada49e..721b5ec9d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -830,9 +830,8 @@ public class ComputeStatsStmt extends StatementBase {
     // TODO(todd): can we avoid loading all the partitions for this?
     Collection<? extends FeFsPartition> partitions =
         FeCatalogUtils.loadAllPartitions(hdfsTable);
-    Map<HdfsScanNode.SampledPartitionMetadata, List<FileDescriptor>> sample =
-            FeFsTable.Utils.getFilesSample(hdfsTable,
-        partitions, samplePerc, minSampleBytes, sampleSeed);
+    Map<Long, List<FileDescriptor>> sample = FeFsTable.Utils.getFilesSample(
+        hdfsTable, partitions, samplePerc, minSampleBytes, sampleSeed);
     long sampleFileBytes = 0;
     for (List<FileDescriptor> fds: sample.values()) {
       for (FileDescriptor fd: fds) sampleFileBytes += fd.getFileLength();
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index e2fa4e713..06e053258 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -482,7 +482,7 @@ public interface FeFsTable extends FeTable {
      *
      * TODO(IMPALA-9883): Fix this for full ACID tables.
      */
-    public static Map<HdfsScanNode.SampledPartitionMetadata, List<FileDescriptor>>
+    public static Map<Long, List<FileDescriptor>>
         getFilesSample(FeFsTable table, Collection<? extends FeFsPartition> inputParts,
             long percentBytes, long minSampleBytes, long randomSeed) {
       Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
@@ -540,15 +540,14 @@ public interface FeFsTable extends FeTable {
       // selected.
       Random rnd = new Random(randomSeed);
       long selectedBytes = 0;
-      Map<HdfsScanNode.SampledPartitionMetadata, List<FileDescriptor>> result =
+      Map<Long, List<FileDescriptor>> result =
           new HashMap<>();
       while (selectedBytes < targetBytes && numFilesRemaining > 0) {
         int selectedIdx = Math.abs(rnd.nextInt()) % numFilesRemaining;
         FeFsPartition part = parts[selectedIdx];
-        HdfsScanNode.SampledPartitionMetadata sampledPartitionMetadata =
-            new HdfsScanNode.SampledPartitionMetadata(part.getId(), part.getFsType());
+        Long partId = Long.valueOf(part.getId());
         List<FileDescriptor> sampleFileIdxs = result.computeIfAbsent(
-            sampledPartitionMetadata, partMetadata -> Lists.newArrayList());
+            partId, id -> Lists.newArrayList());
         FileDescriptor fd = part.getFileDescriptors().get(fileIdxs[selectedIdx]);
         sampleFileIdxs.add(fd);
         selectedBytes += fd.getFileLength();
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 8afd439cb..ca77e75cc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -909,9 +909,10 @@ public class HdfsPartition extends CatalogObjectImpl
 
   @Override
   public FileSystemUtil.FsType getFsType() {
-    Preconditions.checkNotNull(getLocationPath().toUri().getScheme(),
-        "Cannot get scheme from path " + getLocationPath());
-    return FileSystemUtil.FsType.getFsType(getLocationPath().toUri().getScheme());
+    Path location = getLocationPath();
+    Preconditions.checkNotNull(location.toUri().getScheme(),
+        "Cannot get scheme from path " + location);
+    return FileSystemUtil.FsType.getFsType(location.toUri().getScheme());
   }
 
   @Override // FeFsPartition
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
index c44345577..01827556b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
@@ -122,9 +122,10 @@ public class LocalFsPartition implements FeFsPartition {
 
   @Override
   public FileSystemUtil.FsType getFsType() {
-    Preconditions.checkNotNull(getLocationPath().toUri().getScheme(),
-        "Cannot get scheme from path " + getLocationPath());
-    return FileSystemUtil.FsType.getFsType(getLocationPath().toUri().getScheme());
+    Path location = getLocationPath();
+    Preconditions.checkNotNull(location.toUri().getScheme(),
+        "Cannot get scheme from path " + location);
+    return FileSystemUtil.FsType.getFsType(location.toUri().getScheme());
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 41a230741..2da3b32e5 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -26,12 +26,14 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.DescriptorTable;
@@ -327,8 +329,8 @@ public class HdfsScanNode extends ScanNode {
   // this scan node has the count(*) optimization enabled.
   protected SlotDescriptor countStarSlot_ = null;
 
-  // Sampled file descriptors if table sampling is used.
-  Map<SampledPartitionMetadata, List<FileDescriptor>> sampledFiles_ = null;
+  // Sampled file descriptors if table sampling is used. Grouped by partition id.
+  Map<Long, List<FileDescriptor>> sampledFiles_ = null;
 
   // Conjuncts used to trim the set of partitions passed to this node.
   // Used only to display EXPLAIN information.
@@ -1092,37 +1094,6 @@ public class HdfsScanNode extends ScanNode {
     }
   }
 
-  /**
-   * A collection of metadata associated with a sampled partition. Unlike
-   * {@link FeFsPartition} this class is safe to use in hash-based data structures.
-   */
-  public static final class SampledPartitionMetadata {
-
-    private final long partitionId;
-    private final FileSystemUtil.FsType partitionFsType;
-
-    public SampledPartitionMetadata(
-        long partitionId, FileSystemUtil.FsType partitionFsType) {
-      this.partitionId = partitionId;
-      this.partitionFsType = partitionFsType;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-      SampledPartitionMetadata that = (SampledPartitionMetadata) o;
-      return partitionId == that.partitionId && partitionFsType == that.partitionFsType;
-    }
-
-    @Override
-    public int hashCode() {
-      return java.util.Objects.hash(partitionId, partitionFsType);
-    }
-
-    private FileSystemUtil.FsType getPartitionFsType() { return partitionFsType; }
-  }
-
   /**
    * Computes scan ranges (i.e. hdfs splits) plus their storage locations, including
    * volume ids, based on the given maximum number of bytes each scan range should scan.
@@ -1152,14 +1123,7 @@ public class HdfsScanNode extends ScanNode {
         .getMax_scan_range_length();
     scanRangeSpecs_ = new TScanRangeSpec();
 
-    if (sampledFiles_ != null) {
-      numPartitionsPerFs_ = sampledFiles_.keySet().stream().collect(Collectors.groupingBy(
-          SampledPartitionMetadata::getPartitionFsType, Collectors.counting()));
-    } else {
-      numPartitionsPerFs_.putAll(partitions_.stream().collect(
-          Collectors.groupingBy(FeFsPartition::getFsType, Collectors.counting())));
-    }
-
+    numPartitionsPerFs_ = new TreeMap<>();
     totalFilesPerFs_ = new TreeMap<>();
     totalBytesPerFs_ = new TreeMap<>();
     totalFilesPerFsEC_ = new TreeMap<>();
@@ -1173,15 +1137,43 @@ public class HdfsScanNode extends ScanNode {
             .isOptimize_simple_limit()
         && analyzer.getSimpleLimitStatus() != null
         && analyzer.getSimpleLimitStatus().first);
+
+    // Save the last looked up FileSystem object. It is enough for the scheme and
+    // authority part of the URI to match to ensure that getFileSystem() would return the
+    // same file system. See Hadoop's filesystem caching implementation at
+    // https://github.com/apache/hadoop/blob/1046f9cf9888155c27923f3f56efa107d908ad5b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3867
+    // Note that in the Hadoop code the slow part is UserGroupInformation.getCurrentUser()
+    // which is not important here as the user is always the same in Impala.
+    String lastFsScheme = null;
+    String lastFsAuthority = null;
+    FileSystem lastFileSytem = null;
     for (FeFsPartition partition: partitions_) {
-      // Missing disk id accounting is only done for file systems that support the notion
-      // of disk/storage ids.
+      // Save location to local variable beacuse getLocation() can be slow as it needs to
+      // decompress the partition's location.
+      String partitionLocation = partition.getLocation();
+      Path partitionPath = new Path(partitionLocation);
+      String fsScheme = partitionPath.toUri().getScheme();
+      String fsAuthority = partitionPath.toUri().getAuthority();
+      FileSystemUtil.FsType fsType = FileSystemUtil.FsType.getFsType(fsScheme);
+
       FileSystem partitionFs;
-      try {
-        partitionFs = partition.getLocationPath().getFileSystem(CONF);
-      } catch (IOException e) {
-        throw new ImpalaRuntimeException("Error determining partition fs type", e);
+      if (lastFileSytem != null &&
+         Objects.equals(lastFsScheme, fsScheme) &&
+         Objects.equals(lastFsAuthority, fsAuthority)) {
+        partitionFs = lastFileSytem;
+      } else {
+        try {
+          partitionFs = partitionPath.getFileSystem(CONF);
+        } catch (IOException e) {
+          throw new ImpalaRuntimeException("Error determining partition fs type", e);
+        }
+        lastFsScheme = fsScheme;
+        lastFsAuthority = fsAuthority;
+        lastFileSytem = partitionFs;
       }
+
+      // Missing disk id accounting is only done for file systems that support the notion
+      // of disk/storage ids.
       boolean fsHasBlocks = FileSystemUtil.supportsStorageIds(partitionFs);
       List<FileDescriptor> fileDescs = getFileDescriptorsWithLimit(partition, fsHasBlocks,
           isSimpleLimit ? analyzer.getSimpleLimitStatus().second - simpleLimitNumRows
@@ -1191,10 +1183,10 @@ public class HdfsScanNode extends ScanNode {
 
       if (sampledFiles_ != null) {
         // If we are sampling, check whether this partition is included in the sample.
-        fileDescs = sampledFiles_.get(
-            new SampledPartitionMetadata(partition.getId(), partition.getFsType()));
+        fileDescs = sampledFiles_.get(partition.getId());
         if (fileDescs == null) continue;
       }
+
       long partitionNumRows = partition.getNumRows();
 
       analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId());
@@ -1211,7 +1203,7 @@ public class HdfsScanNode extends ScanNode {
         // short-circuiting the scan for a partition key scan).
         long defaultBlockSize = (partition.getFileFormat().isParquetBased()) ?
             analyzer.getQueryOptions().parquet_object_store_split_size :
-            partitionFs.getDefaultBlockSize(partition.getLocationPath());
+            partitionFs.getDefaultBlockSize(partitionPath);
         long maxBlockSize =
             Math.max(defaultBlockSize, FileDescriptor.MIN_SYNTHETIC_BLOCK_SIZE);
         if (scanRangeBytesLimit > 0) {
@@ -1223,32 +1215,34 @@ public class HdfsScanNode extends ScanNode {
       final long partitionBytes = FileDescriptor.computeTotalFileLength(fileDescs);
       long partitionMaxScanRangeBytes = 0;
       boolean partitionMissingDiskIds = false;
-      totalBytesPerFs_.merge(partition.getFsType(), partitionBytes, Long::sum);
-      totalFilesPerFs_.merge(partition.getFsType(), (long) fileDescs.size(), Long::sum);
+      totalBytesPerFs_.merge(fsType, partitionBytes, Long::sum);
+      totalFilesPerFs_.merge(fsType, (long) fileDescs.size(), Long::sum);
+      numPartitionsPerFs_.merge(fsType, 1L, Long::sum);
 
       for (FileDescriptor fileDesc: fileDescs) {
         if (!analyzer.getQueryOptions().isAllow_erasure_coded_files() &&
             fileDesc.getIsEc()) {
           throw new ImpalaRuntimeException(String
               .format("Scanning of HDFS erasure-coded file (%s) is not supported",
-                  fileDesc.getAbsolutePath(partition.getLocation())));
+                  fileDesc.getAbsolutePath(partitionLocation)));
         }
 
         // Accumulate on the number of EC files and the total size of such files.
         if (fileDesc.getIsEc()) {
-          totalFilesPerFsEC_.merge(partition.getFsType(), 1L, Long::sum);
-          totalBytesPerFsEC_.merge(
-              partition.getFsType(), fileDesc.getFileLength(), Long::sum);
+          totalFilesPerFsEC_.merge(fsType, 1L, Long::sum);
+          totalBytesPerFsEC_.merge(fsType, fileDesc.getFileLength(), Long::sum);
         }
 
         if (!fsHasBlocks) {
           Preconditions.checkState(fileDesc.getNumFileBlocks() == 0);
-          generateScanRangeSpecs(partition, fileDesc, scanRangeBytesLimit);
+          generateScanRangeSpecs(
+              partition, partitionLocation, fileDesc, scanRangeBytesLimit);
         } else {
           // Skips files that have no associated blocks.
           if (fileDesc.getNumFileBlocks() == 0) continue;
           Pair<Boolean, Long> result = transformBlocksToScanRanges(
-              partition, fileDesc, fsHasBlocks, scanRangeBytesLimit, analyzer);
+              partition, partitionLocation, fsType, fileDesc, fsHasBlocks,
+              scanRangeBytesLimit, analyzer);
           partitionMaxScanRangeBytes =
               Math.max(partitionMaxScanRangeBytes, result.second);
           if (result.first) partitionMissingDiskIds = true;
@@ -1310,7 +1304,7 @@ public class HdfsScanNode extends ScanNode {
    * @param minSampleBytes minimum number of bytes to read.
    * @param randomSeed used for random number generation.
    */
-  protected Map<SampledPartitionMetadata, List<FileDescriptor>> getFilesSample(
+  protected Map<Long, List<FileDescriptor>> getFilesSample(
       long percentBytes, long minSampleBytes, long randomSeed) {
     return FeFsTable.Utils.getFilesSample(tbl_, partitions_, percentBytes, minSampleBytes,
         randomSeed);
@@ -1342,17 +1336,21 @@ public class HdfsScanNode extends ScanNode {
    * Used for file systems that do not have any physical attributes associated with
    * blocks (e.g., replica locations, caching, etc.). 'maxBlock' size determines how large
    * the scan ranges can be (may be ignored if the file is not splittable).
+   * Expects partition's location string in partitionLocation as getting it from
+   * FeFsPartition can be expensive.
    */
-  private void generateScanRangeSpecs(
-      FeFsPartition partition, FileDescriptor fileDesc, long maxBlockSize) {
+  private void generateScanRangeSpecs(FeFsPartition partition, String partitionLocation,
+      FileDescriptor fileDesc, long maxBlockSize) {
     Preconditions.checkArgument(fileDesc.getNumFileBlocks() == 0);
     Preconditions.checkArgument(maxBlockSize > 0);
     if (fileDesc.getFileLength() <= 0) return;
     boolean splittable = partition.getFileFormat().isSplittable(
         HdfsCompression.fromFileName(fileDesc.getPath()));
+    // Hashing must use String.hashCode() for consistency.
+    int partitionHash = partitionLocation.hashCode();
     TFileSplitGeneratorSpec splitSpec = new TFileSplitGeneratorSpec(
         fileDesc.toThrift(), maxBlockSize, splittable, partition.getId(),
-        partition.getLocation().hashCode());
+        partitionHash);
     scanRangeSpecs_.addToSplit_specs(splitSpec);
     long scanRangeBytes = Math.min(maxBlockSize, fileDesc.getFileLength());
     if (splittable && !isPartitionKeyScan_) {
@@ -1371,14 +1369,15 @@ public class HdfsScanNode extends ScanNode {
    * coordinator can assign ranges to workers to avoid remote reads. These
    * TScanRangeLocationLists are added to scanRanges_. A pair is returned that indicates
    * whether the file has a missing disk id and the maximum scan range (in bytes) found.
+   * Expects partition's location string in partitionLocation and filesystem type in
+   * fsType as getting these from FeFsPartition can be expensive.
    */
   private Pair<Boolean, Long> transformBlocksToScanRanges(FeFsPartition partition,
-      FileDescriptor fileDesc, boolean fsHasBlocks,
-      long scanRangeBytesLimit, Analyzer analyzer) {
+      String partitionLocation, FileSystemUtil.FsType fsType, FileDescriptor fileDesc,
+      boolean fsHasBlocks, long scanRangeBytesLimit, Analyzer analyzer) {
     Preconditions.checkArgument(fileDesc.getNumFileBlocks() > 0);
     boolean fileDescMissingDiskIds = false;
     long fileMaxScanRangeBytes = 0;
-    FileSystemUtil.FsType fsType = partition.getFsType();
     int i = 0;
     if (isPartitionKeyScan_ && (partition.getFileFormat().isParquetBased()
         || partition.getFileFormat() == HdfsFileFormat.ORC)) {
@@ -1430,7 +1429,7 @@ public class HdfsScanNode extends ScanNode {
         THdfsFileSplit hdfsFileSplit = new THdfsFileSplit(fileDesc.getRelativePath(),
             currentOffset, currentLength, partition.getId(), fileDesc.getFileLength(),
             fileDesc.getFileCompression().toThrift(), fileDesc.getModificationTime(),
-            partition.getLocation().hashCode());
+            partitionLocation.hashCode());
         hdfsFileSplit.setAbsolute_path(fileDesc.getAbsolutePath());
         hdfsFileSplit.setIs_encrypted(fileDesc.getIsEncrypted());
         hdfsFileSplit.setIs_erasure_coded(fileDesc.getIsEc());
@@ -1454,7 +1453,7 @@ public class HdfsScanNode extends ScanNode {
       ++numFilesNoDiskIds_;
       if (LOG.isTraceEnabled()) {
         LOG.trace("File blocks mapping to unknown disk ids. Dir: "
-            + partition.getLocation() + " File:" + fileDesc.toString());
+            + partitionLocation + " File:" + fileDesc.toString());
       }
     }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index c2a028aac..c798c4160 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -180,7 +180,7 @@ public class IcebergScanNode extends HdfsScanNode {
    * The algorithm is based on FeFsTable.Utils.getFilesSample()
    */
   @Override
-  protected Map<SampledPartitionMetadata, List<FileDescriptor>> getFilesSample(
+  protected Map<Long, List<FileDescriptor>> getFilesSample(
       long percentBytes, long minSampleBytes, long randomSeed) {
     Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
     Preconditions.checkState(minSampleBytes >= 0);
@@ -191,8 +191,6 @@ public class IcebergScanNode extends HdfsScanNode {
 
     Preconditions.checkState(partitions_.size() == 1);
     FeFsPartition part = partitions_.get(0);
-    SampledPartitionMetadata sampledPartitionMetadata =
-        new SampledPartitionMetadata(part.getId(), part.getFsType());
 
     long totalBytes = 0;
     for (FileDescriptor fd : orderedFds) {
@@ -218,8 +216,8 @@ public class IcebergScanNode extends HdfsScanNode {
       orderedFds.set(selectedIdx, orderedFds.get(numFilesRemaining - 1));
       --numFilesRemaining;
     }
-    Map<SampledPartitionMetadata, List<FileDescriptor>> result = new HashMap<>();
-    result.put(sampledPartitionMetadata, sampleFiles);
+    Map<Long, List<FileDescriptor>> result = new HashMap<>();
+    result.put(part.getId(), sampleFiles);
     return result;
   }