You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2024/01/11 18:07:56 UTC

(impala) branch master updated (cdac777c5 -> a2b8aed2c)

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from cdac777c5 IMPALA-12605: Fix ALTER TABLE SET PARTITION SPEC field id distribution
     new 9cc0aa886 IMPALA-12495: Describe statement for Iceberg metadata tables
     new 117b35b17 IMPALA-12703: ExchangeNode should use getFilteredCardinality
     new 32b29ff36 IMPALA-12356: Fix first ALTER_PARTITION event from Hive could be treated as self event
     new 2d5307418 IMPALA-12687: Fix key conflicts in tracking in-flight catalog operations
     new a2b8aed2c IMPALA-12702: Show reduced cardinality estimation in ExecSummary

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/service/frontend.cc                         |   3 +
 common/thrift/Frontend.thrift                      |   7 +-
 .../java/org/apache/impala/analysis/Analyzer.java  |  22 +-
 .../apache/impala/analysis/DescribeTableStmt.java  |  14 ++
 .../org/apache/impala/analysis/FromClause.java     |   3 -
 .../java/org/apache/impala/analysis/TableName.java |   3 +-
 .../impala/catalog/CatalogServiceCatalog.java      |   4 +-
 .../org/apache/impala/catalog/HdfsPartition.java   |   6 +-
 .../impala/catalog/events/MetastoreEvents.java     |  22 +-
 .../catalog/monitor/CatalogOperationTracker.java   | 115 ++++++---
 .../org/apache/impala/planner/ExchangeNode.java    |   2 +-
 .../java/org/apache/impala/planner/PlanNode.java   |   3 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  27 +--
 .../impala/service/DescribeResultFactory.java      |  18 +-
 .../java/org/apache/impala/service/Frontend.java   |  70 ++++--
 .../org/apache/impala/service/JniFrontend.java     |  11 +-
 .../authorization/AuthorizationTestBase.java       |   8 +-
 .../queries/PlannerTest/tpcds-processing-cost.test |  12 +-
 .../queries/QueryTest/iceberg-metadata-tables.test | 268 ++++++++++++++++++++-
 .../queries/QueryTest/runtime_filters.test         |   6 +-
 tests/custom_cluster/test_events_custom_configs.py |  58 +++--
 tests/custom_cluster/test_web_pages.py             |  70 +++++-
 tests/metadata/test_event_processing.py            |  44 ++++
 tests/query_test/test_observability.py             |  18 ++
 24 files changed, 644 insertions(+), 170 deletions(-)


(impala) 05/05: IMPALA-12702: Show reduced cardinality estimation in ExecSummary

Posted by wz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a2b8aed2c2d4c1bb25ed9626a2b014b79ec741ad
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Wed Jan 10 10:16:02 2024 -0800

    IMPALA-12702: Show reduced cardinality estimation in ExecSummary
    
    In the query profile, cardinality reduction from IMPALA-12018 is
    highlighted in Plan section, but missing out from ExecSummary section.
    This patch changes the ExecSummary to show the reduced cardinality
    estimation if it set.
    
    Testing:
    - Add TestObservability::test_reduced_cardinality_by_filter
    
    Change-Id: If1f51ce585a1cb66e518b725686ab3076ffa8168
    Reviewed-on: http://gerrit.cloudera.org:8080/20879
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../main/java/org/apache/impala/planner/PlanNode.java  |  3 ++-
 .../queries/QueryTest/runtime_filters.test             |  6 +++---
 tests/query_test/test_observability.py                 | 18 ++++++++++++++++++
 3 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index bdd2fbcb0..e14195f12 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -498,7 +498,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     msg.limit = limit_;
 
     TExecStats estimatedStats = new TExecStats();
-    estimatedStats.setCardinality(cardinality_);
+    estimatedStats.setCardinality(
+        filteredCardinality_ > -1 ? filteredCardinality_ : cardinality_);
     estimatedStats.setMemory_used(nodeResourceProfile_.getMemEstimateBytes());
     msg.setLabel(getDisplayLabel());
     msg.setLabel_detail(getDisplayLabelDetail());
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
index b06b39b3c..00c6c96a6 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
@@ -27,7 +27,7 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 ---- RUNTIME_PROFILE
 aggregation(SUM, Files rejected): 22
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: 00:SCAN KUDU.*s[ ]+620[ ]+7.30K.*
+row_regex: 00:SCAN KUDU.*s[ ]+620[ ]+608.*
 ====
 
 
@@ -59,7 +59,7 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 ---- RUNTIME_PROFILE
 aggregation(SUM, Files rejected): 22
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: 00:SCAN KUDU.*s[ ]+620[ ]+7.30K.*
+row_regex: 00:SCAN KUDU.*s[ ]+620[ ]+608.*
 ====
 
 
@@ -321,7 +321,7 @@ with t1 as (select month x, bigint_col y from alltypes limit 7301),
 ---- RUNTIME_PROFILE
 aggregation(SUM, Files rejected): 22
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: 00:SCAN KUDU.*s[ ]+620[ ]+7.30K.*
+row_regex: 00:SCAN KUDU.*s[ ]+620[ ]+1.82K.*
 ====
 
 
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index d82b3b0d7..597dea515 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -872,6 +872,24 @@ class TestObservability(ImpalaTestSuite):
     assert len(re.findall('Single node plan created:', runtime_profile, re.M)) == 2
     assert len(re.findall('Distributed plan created:', runtime_profile, re.M)) == 2
 
+  def test_reduced_cardinality_by_filter(self):
+    """IMPALA-12702: Check that ExecSummary shows the reduced cardinality estimation."""
+    query_opts = {'compute_processing_cost': True}
+    query = """select STRAIGHT_JOIN count(*) from
+        (select l_orderkey from tpch_parquet.lineitem) a
+        join (select o_orderkey, o_custkey from tpch_parquet.orders) l1
+          on a.l_orderkey = l1.o_orderkey
+        where l1.o_custkey < 1000"""
+    result = self.execute_query(query, query_opts)
+    scan = result.exec_summary[10]
+    assert scan['operator'] == '00:SCAN HDFS'
+    assert scan['num_rows'] == 39563
+    assert scan['est_num_rows'] == 575771
+    assert scan['detail'] == 'tpch_parquet.lineitem'
+    runtime_profile = result.runtime_profile
+    assert "cardinality=575.77K(filtered from 6.00M)" in runtime_profile
+
+
 class TestQueryStates(ImpalaTestSuite):
   """Test that the 'Query State' and 'Impala Query State' are set correctly in the
   runtime profile."""


(impala) 03/05: IMPALA-12356: Fix first ALTER_PARTITION event from Hive could be treated as self event

Posted by wz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 32b29ff36fb3e05fd620a6714de88805052d0117
Author: Venu Reddy <k....@gmail.com>
AuthorDate: Fri Sep 15 12:44:27 2023 +0530

    IMPALA-12356: Fix first ALTER_PARTITION event from Hive could be
    treated as self event
    
    Self event check for add partition event is done only for the
    transactional tables with IMPALA-10502 (commit id: 7f7a631). But
    during addition of new partition(with insert statement), catalog
    service id and version number are added to partition params of the
    parition irrespective of whether the table is transactional or not.
    Thus the version number is added to partition's inFlightEvents_ and
    remained in it until the next alter partition event from hive. Thus
    led to detection of the alter partition event as self event.
    
    This commit ensures the catalog service id and version number are not
    added to partition params if the partition is added to a
    non-transactional table.
    
    Also fixed another bug in reload event. Reload event self check
    fails due to the above fix as it expects catalog service id and
    version number in the partition params. Fixed to use last refreshed
    event id to skip the self reload events.
    
    Testing:
    - Manually tested in cluster and added testcases
    
    Change-Id: I23c2affa3fe32c0b3843bff5e4c0018dce9060d3
    Reviewed-on: http://gerrit.cloudera.org:8080/20486
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/CatalogServiceCatalog.java      |  4 +-
 .../org/apache/impala/catalog/HdfsPartition.java   |  6 +--
 .../impala/catalog/events/MetastoreEvents.java     | 22 +++-----
 .../apache/impala/service/CatalogOpExecutor.java   | 27 +++++-----
 tests/custom_cluster/test_events_custom_configs.py | 58 +++++++++++++---------
 tests/metadata/test_event_processing.py            | 44 ++++++++++++++++
 6 files changed, 101 insertions(+), 60 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 5ffe33e93..83db6f7b9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -4025,9 +4025,7 @@ public class CatalogServiceCatalog extends Catalog {
       LOG.info("Partition {} of table {}.{} has last refresh id as {}. " +
           "Comparing it with {}.", hdfsPartition.getPartitionName(), dbName, tableName,
           hdfsPartition.getLastRefreshEventId(), eventId);
-      if (hdfsPartition.getLastRefreshEventId() > eventId) {
-        return true;
-      }
+      if (hdfsPartition.getLastRefreshEventId() >= eventId) return true;
     } catch (CatalogException ex) {
       LOG.warn("Encountered an exception while the partition's last refresh event id: "
           + dbName + "." + tableName + ". Ignoring further processing and try to " +
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 861a01adc..e7810344d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -1786,9 +1786,9 @@ public class HdfsPartition extends CatalogObjectImpl
           && hmsParameters_.equals(oldInstance.hmsParameters_)
           && partitionStats_ == oldInstance.partitionStats_
           && hasIncrementalStats_ == oldInstance.hasIncrementalStats_
-          && numRows_ == oldInstance.numRows_
-          && writeId_ == oldInstance.writeId_
-          && lastCompactionId_ == oldInstance.lastCompactionId_);
+          && numRows_ == oldInstance.numRows_ && writeId_ == oldInstance.writeId_
+          && lastCompactionId_ == oldInstance.lastCompactionId_
+          && lastRefreshEventId_ == oldInstance_.lastRefreshEventId_);
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 062b9fd84..6dcd86420 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -2711,22 +2711,13 @@ public class MetastoreEvents {
 
     @Override
     public SelfEventContext getSelfEventContext() {
-      if (reloadPartition_ != null) {
-        // create selfEventContext for reload partition event
-        List<TPartitionKeyValue> tPartSpec =
-            getTPartitionSpecFromHmsPartition(msTbl_, reloadPartition_);
-        return new SelfEventContext(dbName_, tblName_, Arrays.asList(tPartSpec),
-            reloadPartition_.getParameters(), null);
-      } else {
-        // create selfEventContext for reload table event
-        return new SelfEventContext(
-            dbName_, tblName_, null, msTbl_.getParameters());
-      }
+      throw new UnsupportedOperationException("Self-event evaluation is unnecessary for"
+          + " this event type");
     }
 
     @Override
     public void processTableEvent() throws MetastoreNotificationException {
-      if (isSelfEvent() || isOlderEvent()) {
+      if (isOlderEvent()) {
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
             .inc(getNumberOfEvents());
         infoLog("Incremented events skipped counter to {}",
@@ -2750,9 +2741,10 @@ public class MetastoreEvents {
         return false;
       }
       // Always check the lastRefreshEventId on the table first for table level refresh
-      if (tbl_.getLastRefreshEventId() > getEventId() || (reloadPartition_ != null &&
-          catalog_.isPartitionLoadedAfterEvent(dbName_, tblName_, reloadPartition_,
-              getEventId()))) {
+      if (tbl_.getLastRefreshEventId() >= getEventId()
+          || (reloadPartition_ != null
+                 && catalog_.isPartitionLoadedAfterEvent(
+                        dbName_, tblName_, reloadPartition_, getEventId()))) {
         return true;
       }
       return false;
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 29178180c..edd68215c 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -4556,7 +4556,8 @@ public class CatalogOpExecutor {
         if (removed) {
           LOG.info(
               "EventId: {} Skipping addition of partition {} since it was removed later"
-                  + "in catalog for table {}", eventId,
+                  + " in catalog for table {}",
+              eventId,
               FileUtils.makePartName(hdfsTable.getClusteringColNames(), part.getValues()),
               hdfsTable.getFullName());
         } else {
@@ -6737,7 +6738,7 @@ public class CatalogOpExecutor {
         }
         Preconditions.checkNotNull(tbl, "tbl is null in " + cmdString);
         // fire event for refresh event and update the last refresh event id
-        fireReloadEventAndUpdateRefreshEventId(req, updatedThriftTable, tblName, tbl);
+        fireReloadEventAndUpdateRefreshEventId(req, tblName, tbl);
       }
 
       // Return the TCatalogObject in the result to indicate this request can be
@@ -6783,31 +6784,22 @@ public class CatalogOpExecutor {
    * This class invokes metastore shim's fireReloadEvent to fire event to HMS
    * and update the last refresh event id in the cache
    * @param req - request object for TResetMetadataRequest.
-   * @param updatedThriftTable - updated thrift table after refresh query
    * @param tblName
    * @param tbl
    */
-  private void fireReloadEventAndUpdateRefreshEventId(TResetMetadataRequest req,
-      TCatalogObject updatedThriftTable, TableName tblName, Table tbl) {
+  private void fireReloadEventAndUpdateRefreshEventId(
+      TResetMetadataRequest req, TableName tblName, Table tbl) {
     List<String> partVals = null;
     if (req.isSetPartition_spec()) {
       partVals = req.getPartition_spec().stream().
           map(partSpec -> partSpec.getValue()).collect(Collectors.toList());
     }
     try {
-      // Get new catalog version for table refresh/invalidate.
-      long newCatalogVersion = updatedThriftTable.getCatalog_version();
-      Map<String, String> tableParams = new HashMap<>();
-      tableParams.put(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
-          catalog_.getCatalogServiceId());
-      tableParams.put(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
-          String.valueOf(newCatalogVersion));
       List<Long> eventIds = MetastoreShim.fireReloadEventHelper(
           catalog_.getMetaStoreClient(), req.isIs_refresh(), partVals, tblName.getDb(),
-          tblName.getTbl(), tableParams);
+          tblName.getTbl(), Collections.emptyMap());
       if (req.isIs_refresh()) {
         if (catalog_.tryLock(tbl, true, 600000)) {
-          catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
           if (!eventIds.isEmpty()) {
             if (req.isSetPartition_spec()) {
               HdfsPartition partition = ((HdfsTable) tbl)
@@ -6982,7 +6974,12 @@ public class CatalogOpExecutor {
               partition.setValues(MetaStoreUtil.getPartValsFromName(msTbl, partName));
               partition.setSd(MetaStoreUtil.shallowCopyStorageDescriptor(msTbl.getSd()));
               partition.getSd().setLocation(msTbl.getSd().getLocation() + "/" + partName);
-              addCatalogServiceIdentifiers(msTbl, partition);
+              if (AcidUtils.isTransactionalTable(msTbl.getParameters())) {
+                // Self event detection is deprecated for non-transactional tables add
+                // partition. So we add catalog service identifiers only for
+                // transactional tables
+                addCatalogServiceIdentifiers(msTbl, partition);
+              }
               MetastoreShim.updatePartitionStatsFast(partition, msTbl, warehouse);
             }
 
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index 5f5942a3e..aaebde2b9 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -233,15 +233,24 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     self.__run_self_events_test(unique_database, True)
     self.__run_self_events_test(unique_database, False)
 
+  @CustomClusterTestSuite.with_args(
+      catalogd_args="--hms_event_polling_interval_s=5"
+                    " --enable_reload_events=true")
+  def test_refresh_invalidate_events(self, unique_database):
+    self.run_test_refresh_invalidate_events(unique_database, "reload_table")
+
   @CustomClusterTestSuite.with_args(
       catalogd_args="--hms_event_polling_interval_s=5"
                     " --enable_reload_events=true"
                     " --enable_sync_to_latest_event_on_ddls=true")
-  def test_refresh_invalidate_events(self, unique_database):
+  def test_refresh_invalidate_events_enable_sync_to_latest_events(self, unique_database):
+    self.run_test_refresh_invalidate_events(unique_database, "reload_table_sync", True)
+
+  def run_test_refresh_invalidate_events(self, unique_database, test_reload_table,
+    enable_sync_to_latest_event_on_ddls=False):
     """Test is to verify Impala-11808, refresh/invalidate commands should generate a
     Reload event in HMS and CatalogD's event processor should process this event.
     """
-    test_reload_table = "test_reload_table"
     self.client.execute(
       "create table {}.{} (i int) partitioned by (year int) "
         .format(unique_database, test_reload_table))
@@ -275,28 +284,29 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     check_self_events("refresh {}.{}".format(unique_database, test_reload_table))
     EventProcessorUtils.wait_for_event_processing(self)
 
-    # Test to verify if older events are being skipped in event processor
-    data = FireEventRequestData()
-    data.refreshEvent = True
-    req = FireEventRequest(True, data)
-    req.dbName = unique_database
-    req.tableName = test_reload_table
-    # table level reload events
-    tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
-    for i in range(10):
-      self.hive_client.fire_listener_event(req)
-    EventProcessorUtils.wait_for_event_processing(self)
-    tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
-    assert tbl_events_skipped_after > tbl_events_skipped_before
-    # partition level reload events
-    EventProcessorUtils.wait_for_event_processing(self)
-    part_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
-    req.partitionVals = ["2022"]
-    for i in range(10):
-      self.hive_client.fire_listener_event(req)
-    EventProcessorUtils.wait_for_event_processing(self)
-    part_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
-    assert part_events_skipped_after > part_events_skipped_before
+    if enable_sync_to_latest_event_on_ddls:
+      # Test to verify if older events are being skipped in event processor
+      data = FireEventRequestData()
+      data.refreshEvent = True
+      req = FireEventRequest(True, data)
+      req.dbName = unique_database
+      req.tableName = test_reload_table
+      # table level reload events
+      tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
+      for i in range(10):
+        self.hive_client.fire_listener_event(req)
+      EventProcessorUtils.wait_for_event_processing(self)
+      tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
+      assert tbl_events_skipped_after > tbl_events_skipped_before
+      # partition level reload events
+      EventProcessorUtils.wait_for_event_processing(self)
+      part_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
+      req.partitionVals = ["2022"]
+      for i in range(10):
+        self.hive_client.fire_listener_event(req)
+      EventProcessorUtils.wait_for_event_processing(self)
+      part_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
+      assert part_events_skipped_after > part_events_skipped_before
 
     # Test to verify IMPALA-12213
     table = self.hive_client.get_table(unique_database, test_reload_table)
diff --git a/tests/metadata/test_event_processing.py b/tests/metadata/test_event_processing.py
index f0358b7aa..ff764db78 100644
--- a/tests/metadata/test_event_processing.py
+++ b/tests/metadata/test_event_processing.py
@@ -432,6 +432,50 @@ class TestEventProcessing(ImpalaTestSuite):
     finally:
       check_call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", staging_dir])
 
+  def test_transact_partition_location_change_from_hive(self, unique_database):
+    """IMPALA-12356: Verify alter partition from hive on transactional table"""
+    self.run_test_partition_location_change_from_hive(unique_database,
+                                                      "transact_alter_part_hive", True)
+
+  def test_partition_location_change_from_hive(self, unique_database):
+    """IMPALA-12356: Verify alter partition from hive on non-transactional table"""
+    self.run_test_partition_location_change_from_hive(unique_database, "alter_part_hive")
+
+  def run_test_partition_location_change_from_hive(self, unique_database, tbl_name,
+    is_transactional=False):
+    fq_tbl_name = unique_database + "." + tbl_name
+    TBLPROPERTIES = self.__get_transactional_tblproperties(is_transactional)
+    # Create the table
+    self.client.execute(
+      "create table %s (i int) partitioned by(j int) stored as parquet %s"
+      % (fq_tbl_name, TBLPROPERTIES))
+    # Insert some data to a partition
+    p1 = "j=1"
+    self.client.execute("insert into table %s partition(%s) values (0),(1),(2)"
+                        % (fq_tbl_name, p1))
+    tbl_location = self._get_table_property("Location:", fq_tbl_name)
+    partitions = self.get_impala_partition_info(fq_tbl_name, 'Location')
+    assert [("{0}/{1}".format(tbl_location, p1),)] == partitions
+    # Alter partition location from hive
+    new_part_location = tbl_location + "/j=2"
+    self.run_stmt_in_hive("alter table %s partition(%s) set location '%s'"
+                          % (fq_tbl_name, p1, new_part_location))
+    EventProcessorUtils.wait_for_event_processing(self)
+    # Verify if the location is updated
+    partitions = self.get_impala_partition_info(fq_tbl_name, 'Location')
+    assert [(new_part_location,)] == partitions
+
+  def _get_table_property(self, property_name, table_name):
+    """Extract the table property value from output of DESCRIBE FORMATTED."""
+    result = self.client.execute("describe formatted {0}".format(table_name))
+    for row in result.data:
+      if property_name in row:
+        row = row.split('\t')
+        if row[1] == 'NULL':
+          break
+        return row[1].rstrip()
+    return None
+
   def __get_transactional_tblproperties(self, is_transactional):
     """
     Util method to generate the tblproperties for transactional tables


(impala) 02/05: IMPALA-12703: ExchangeNode should use getFilteredCardinality

Posted by wz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 117b35b17df9ff63301f19e3719d13ab777ccbeb
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Wed Jan 10 14:06:26 2024 -0800

    IMPALA-12703: ExchangeNode should use getFilteredCardinality
    
    IMPALA-12018 changed the CPU costing formula from using getCardinality()
    to getFilteredCardinality() for DataStreamSink, HashJoinNode, JoinNode,
    and NestedLoopJoinNode. However, it miss to do the same for
    ExchangeNode, which is also eligible for cardinality reduction by
    runtime filter. This patch fix the formula for ExchangeNode.
    
    Testing
    - Pass PlannerTest#testProcessingCost.
    
    Change-Id: I62a649b67c75c46bd57d8ceda80265af3321d85b
    Reviewed-on: http://gerrit.cloudera.org:8080/20880
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/src/main/java/org/apache/impala/planner/ExchangeNode.java |  2 +-
 .../queries/PlannerTest/tpcds-processing-cost.test           | 12 ++++++------
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index d2882cc77..1f96b8156 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -234,7 +234,7 @@ public class ExchangeNode extends PlanNode {
     float conjunctsCost = ExprUtil.computeExprsTotalCost(conjuncts_);
     float materializationCost = estimateSerializationCostPerRow();
     processingCost_ = ProcessingCost.basicCost(getDisplayLabel() + "(receiving)",
-        getChild(0).getCardinality(), conjunctsCost, materializationCost);
+        getChild(0).getFilteredCardinality(), conjunctsCost, materializationCost);
 
     if (isBroadcastExchange()) {
       processingCost_ = ProcessingCost.broadcastCost(processingCost_,
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
index 8ce1dfeff..21f6cf47d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
@@ -6642,7 +6642,7 @@ PLAN-ROOT SINK
 |  |
 |  F14:PLAN FRAGMENT [HASH(sts.ss_item_sk,sts.ss_ticket_number)] hosts=3 instances=6 (adjusted from 48)
 |  Per-Instance Resources: mem-estimate=14.55MB mem-reservation=2.00MB thread-reservation=1
-|  max-parallelism=6 segment-costs=[607052, 1000] cpu-comparison-result=22 [max(12 (self) vs 22 (sum children))]
+|  max-parallelism=6 segment-costs=[602921, 1000] cpu-comparison-result=22 [max(12 (self) vs 22 (sum children))]
 |  28:AGGREGATE [STREAMING]
 |  |  output: sum(CAST(coalesce(sr.sr_return_quantity, CAST(0 AS INT)) AS BIGINT)), sum(CAST(coalesce(sts.ss_quantity, CAST(0 AS INT)) AS BIGINT)), sum(coalesce(sr.sr_return_amt, CAST(0 AS DECIMAL(7,2)))), sum(coalesce(sts.ss_net_paid, CAST(0 AS DECIMAL(7,2))))
 |  |  group by: sts.ss_item_sk
@@ -6729,7 +6729,7 @@ PLAN-ROOT SINK
 |  |
 |  48:EXCHANGE [HASH(sts.ss_item_sk,sts.ss_ticket_number)]
 |  |  mem-estimate=3.14MB mem-reservation=0B thread-reservation=0
-|  |  tuple-ids=16 row-size=32B cardinality=170.55K(filtered from 288.04K) cost=10127
+|  |  tuple-ids=16 row-size=32B cardinality=170.55K(filtered from 288.04K) cost=5996
 |  |  in pipelines: 23(GETNEXT)
 |  |
 |  F12:PLAN FRAGMENT [RANDOM] hosts=3 instances=6 (adjusted from 48)
@@ -6810,7 +6810,7 @@ PLAN-ROOT SINK
 |  |
 |  F08:PLAN FRAGMENT [HASH(cs.cs_item_sk,cs.cs_order_number)] hosts=3 instances=6 (adjusted from 48)
 |  Per-Instance Resources: mem-estimate=13.08MB mem-reservation=2.00MB thread-reservation=1
-|  max-parallelism=6 segment-costs=[302670, 499] cpu-comparison-result=22 [max(12 (self) vs 22 (sum children))]
+|  max-parallelism=6 segment-costs=[300592, 499] cpu-comparison-result=22 [max(12 (self) vs 22 (sum children))]
 |  17:AGGREGATE [STREAMING]
 |  |  output: sum(CAST(coalesce(cr.cr_return_quantity, CAST(0 AS INT)) AS BIGINT)), sum(CAST(coalesce(cs.cs_quantity, CAST(0 AS INT)) AS BIGINT)), sum(coalesce(cr.cr_return_amount, CAST(0 AS DECIMAL(7,2)))), sum(coalesce(cs.cs_net_paid, CAST(0 AS DECIMAL(7,2))))
 |  |  group by: cs.cs_item_sk
@@ -6897,7 +6897,7 @@ PLAN-ROOT SINK
 |  |
 |  42:EXCHANGE [HASH(cs.cs_item_sk,cs.cs_order_number)]
 |  |  mem-estimate=1.68MB mem-reservation=0B thread-reservation=0
-|  |  tuple-ids=8 row-size=32B cardinality=85.03K(filtered from 144.16K) cost=5068
+|  |  tuple-ids=8 row-size=32B cardinality=85.03K(filtered from 144.16K) cost=2990
 |  |  in pipelines: 12(GETNEXT)
 |  |
 |  F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=6 (adjusted from 48)
@@ -6978,7 +6978,7 @@ max-parallelism=6 segment-costs=[21550, 4260, 317] cpu-comparison-result=22 [max
 |
 F02:PLAN FRAGMENT [HASH(ws.ws_item_sk,ws.ws_order_number)] hosts=3 instances=6 (adjusted from 48)
 Per-Instance Resources: mem-estimate=12.35MB mem-reservation=2.00MB thread-reservation=1
-max-parallelism=6 segment-costs=[151615, 250] cpu-comparison-result=22 [max(12 (self) vs 22 (sum children))]
+max-parallelism=6 segment-costs=[150583, 250] cpu-comparison-result=22 [max(12 (self) vs 22 (sum children))]
 06:AGGREGATE [STREAMING]
 |  output: sum(CAST(coalesce(wr.wr_return_quantity, CAST(0 AS INT)) AS BIGINT)), sum(CAST(coalesce(ws.ws_quantity, CAST(0 AS INT)) AS BIGINT)), sum(coalesce(wr.wr_return_amt, CAST(0 AS DECIMAL(7,2)))), sum(coalesce(ws.ws_net_paid, CAST(0 AS DECIMAL(7,2))))
 |  group by: ws.ws_item_sk
@@ -7065,7 +7065,7 @@ max-parallelism=6 segment-costs=[151615, 250] cpu-comparison-result=22 [max(12 (
 |
 36:EXCHANGE [HASH(ws.ws_item_sk,ws.ws_order_number)]
 |  mem-estimate=965.35KB mem-reservation=0B thread-reservation=0
-|  tuple-ids=0 row-size=32B cardinality=42.59K(filtered from 71.94K) cost=2530
+|  tuple-ids=0 row-size=32B cardinality=42.59K(filtered from 71.94K) cost=1498
 |  in pipelines: 01(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6 (adjusted from 48)


(impala) 01/05: IMPALA-12495: Describe statement for Iceberg metadata tables

Posted by wz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9cc0aa886cbbaac3f69c7ea91b397ebbea739986
Author: Tamas Mate <tm...@cloudera.com>
AuthorDate: Wed Nov 8 13:07:32 2023 +0100

    IMPALA-12495: Describe statement for Iceberg metadata tables
    
    Iceberg metadata tables are virtual tables and their schemata are
    predefined by the Iceberg library. This commit extends the DESCRIBE
    <table> statement, so the users can print the table description of these
    tables.
    
    Metadata tables do not exist in the HMS, therefore
    DESCRIBE FORMATTED|EXTENDED statements are not permitted.
    
    Testing:
     - Added E2E tests
    
    Change-Id: Ibe22f271a59a6885035991c09b5101193ade6e97
    Reviewed-on: http://gerrit.cloudera.org:8080/20695
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/frontend.cc                         |   3 +
 common/thrift/Frontend.thrift                      |   7 +-
 .../java/org/apache/impala/analysis/Analyzer.java  |  22 +-
 .../apache/impala/analysis/DescribeTableStmt.java  |  14 ++
 .../org/apache/impala/analysis/FromClause.java     |   3 -
 .../java/org/apache/impala/analysis/TableName.java |   3 +-
 .../impala/service/DescribeResultFactory.java      |  18 +-
 .../java/org/apache/impala/service/Frontend.java   |  70 ++++--
 .../org/apache/impala/service/JniFrontend.java     |  11 +-
 .../authorization/AuthorizationTestBase.java       |   8 +-
 .../queries/QueryTest/iceberg-metadata-tables.test | 268 ++++++++++++++++++++-
 11 files changed, 377 insertions(+), 50 deletions(-)

diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index e8f53f3d7..0a9ce18dc 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -186,6 +186,9 @@ Status Frontend::DescribeTable(const TDescribeTableParams& params,
   tparams.__set_output_style(params.output_style);
   if (params.__isset.table_name) tparams.__set_table_name(params.table_name);
   if (params.__isset.result_struct) tparams.__set_result_struct(params.result_struct);
+  if (params.__isset.metadata_table_name) {
+    tparams.__set_metadata_table_name(params.metadata_table_name);
+  }
   tparams.__set_session(session);
   return JniUtil::CallJniMethod(fe_, describe_table_id_, tparams, response);
 }
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index e7118ed19..f19f532c2 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -187,11 +187,14 @@ struct TDescribeTableParams {
   // Set when describing a table.
   2: optional CatalogObjects.TTableName table_name
 
+  // Set for metadata tables
+  3: optional string metadata_table_name
+
   // Set when describing a path to a nested collection.
-  3: optional Types.TColumnType result_struct
+  4: optional Types.TColumnType result_struct
 
   // Session state for the user who initiated this request.
-  4: optional Query.TSessionState session
+  5: optional Query.TSessionState session
 }
 
 // Results of a call to describeDb() and describeTable()
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 91e3a0151..f6ce7b2bd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -3563,6 +3563,9 @@ public class Analyzer {
    */
   public FeTable getTable(TableName tblName, boolean mustExist)
       throws AnalysisException, TableLoadingException {
+    if (IcebergMetadataTable.isIcebergMetadataTable(tblName.toPath())) {
+      return getMetadataVirtualTable(tblName.toPath());
+    }
     FeTable table = globalState_.stmtTableCache.tables.get(tblName);
     if (table == null) {
       if (!mustExist) {
@@ -3603,13 +3606,15 @@ public class Analyzer {
   }
 
   /**
-   * Adds a new Iceberg metadata table to the stmt table cache. At this point it is
-   * unknown if the base table is loaded for scanning as well, therefore the original
-   * table is kept. The metadata table will have its vTbl field filled, while the original
-   * table gets a new key without the vTbl field.
-   * 'tblRefPath' parameter has to be an IcebergMetadataTable reference path.
+   * Retrieves the Iceberg metadata table from the stmtTableCache if the Iceberg metadata
+   * table exists or creates and adds it to the stmtTableCache if it does not exist. At
+   * this point it is unknown if the base table is loaded for scanning as well, therefore
+   * the original table is kept. The metadata table will have its vTbl field filled, while
+   * the original table gets a new key without the vTbl field. 'tblRefPath' parameter has
+   * to be an IcebergMetadataTable reference path. Returns the the Iceberg metadata table.
    */
-  public void addMetadataVirtualTable(List<String> tblRefPath) throws AnalysisException {
+  public FeTable getMetadataVirtualTable(List<String> tblRefPath)
+      throws AnalysisException {
     Preconditions.checkArgument(IcebergMetadataTable.isIcebergMetadataTable(tblRefPath));
     try {
       TableName catalogTableName = new TableName(tblRefPath.get(0),
@@ -3619,11 +3624,14 @@ public class Analyzer {
       // The catalog table (the base of the virtual table) has been loaded and cached
       // under the name of the virtual table.
       FeTable catalogTable = getStmtTableCache().tables.get(virtualTableName);
-      if (catalogTable instanceof IcebergMetadataTable || catalogTable == null) return;
+      if (catalogTable instanceof IcebergMetadataTable || catalogTable == null) {
+        return catalogTable;
+      }
       IcebergMetadataTable virtualTable =
           new IcebergMetadataTable(catalogTable, tblRefPath.get(2));
       getStmtTableCache().tables.put(catalogTableName, catalogTable);
       getStmtTableCache().tables.put(virtualTableName, virtualTable);
+      return virtualTable;
     } catch (ImpalaRuntimeException e) {
       throw new AnalysisException("Could not create metadata table for table "
           + "reference: " + StringUtils.join(tblRefPath, "."), e);
diff --git a/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java
index ec99f6f62..4830354df 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java
@@ -25,6 +25,7 @@ import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TDescribeOutputStyle;
 import org.apache.impala.thrift.TDescribeTableParams;
@@ -124,6 +125,7 @@ public class DescribeTableStmt extends StatementBase {
     // all columns returning an empty result due to insufficient VIEW_METADATA privilege.
     analyzer.getTable(table_.getTableName(), /* add column-level privilege */ true,
         Privilege.ANY);
+    checkMinimalForIcebergMetadataTable();
 
     // Describing a table.
     if (path_.destTable() != null) return;
@@ -150,6 +152,14 @@ public class DescribeTableStmt extends StatementBase {
     }
   }
 
+  private void checkMinimalForIcebergMetadataTable() throws AnalysisException {
+    if (table_ instanceof IcebergMetadataTable &&
+        outputStyle_ != TDescribeOutputStyle.MINIMAL) {
+      throw new AnalysisException("DESCRIBE FORMATTED|EXTENDED cannot refer to a "
+          + "metadata table.");
+    }
+  }
+
   public TDescribeTableParams toThrift() {
     TDescribeTableParams params = new TDescribeTableParams();
     params.setOutput_style(outputStyle_);
@@ -158,6 +168,10 @@ public class DescribeTableStmt extends StatementBase {
     } else {
       Preconditions.checkNotNull(table_);
       params.setTable_name(table_.getTableName().toThrift());
+      if (table_ instanceof IcebergMetadataTable) {
+        params.setMetadata_table_name(((IcebergMetadataTable)table_).
+            getMetadataTableName());
+      }
     }
     return params;
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/FromClause.java b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
index 656c609c4..306b5af86 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FromClause.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
@@ -84,9 +84,6 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
     boolean hasJoiningUnnest = false;
     for (int i = 0; i < tableRefs_.size(); ++i) {
       TableRef tblRef = tableRefs_.get(i);
-      if (IcebergMetadataTable.isIcebergMetadataTable(tblRef.getPath())) {
-        analyzer.addMetadataVirtualTable(tblRef.getPath());
-      }
       tblRef = analyzer.resolveTableRef(tblRef);
       tableRefs_.set(i, Preconditions.checkNotNull(tblRef));
       tblRef.setLeftTblRef(leftTblRef);
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableName.java b/fe/src/main/java/org/apache/impala/analysis/TableName.java
index d7cdcc6c6..1f0efc67e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableName.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableName.java
@@ -140,9 +140,10 @@ public class TableName {
   }
 
   public List<String> toPath() {
-    List<String> result = Lists.newArrayListWithCapacity(2);
+    List<String> result = Lists.newArrayListWithCapacity(3);
     if (db_ != null) result.add(db_);
     result.add(tbl_);
+    if (vTbl_ != null && !vTbl_.isEmpty()) result.add(vTbl_);
     return result;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java b/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java
index 888df7b65..6ce58e5c1 100644
--- a/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java
+++ b/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java
@@ -33,6 +33,7 @@ import org.apache.impala.catalog.IcebergColumn;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
+import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.thrift.TColumnValue;
@@ -351,4 +352,19 @@ public class DescribeResultFactory {
     }
     return descResult;
   }
-}
+
+  /**
+   * Builds a TDescribeResult for an Iceberg metadata table from an IcebergTable and a
+   * metadata table name.
+   *
+   * This describe request is issued against a VirtualTable which only exists in the
+   * Analyzer's StmtTableCache. Therefore, to get the columns of an IcebergMetadataTable
+   * it is simpler to re-create this object than to extract those from a new
+   * org.apache.iceberg.Table object or to send it over.
+   */
+  public static TDescribeResult buildIcebergMetadataDescribeMinimalResult(FeTable table,
+      String vTableName) throws ImpalaRuntimeException {
+    IcebergMetadataTable metadataTable = new IcebergMetadataTable(table, vTableName);
+    return buildIcebergDescribeMinimalResult(metadataTable.getColumns());
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 31f9a570d..d030655fd 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -127,9 +127,11 @@ import org.apache.impala.catalog.ImpaladTableUsageTracker;
 import org.apache.impala.catalog.MaterializedViewHdfsTable;
 import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
+import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
@@ -169,6 +171,7 @@ import org.apache.impala.thrift.TDescribeHistoryParams;
 import org.apache.impala.thrift.TDescribeOutputStyle;
 import org.apache.impala.thrift.TDescribeResult;
 import org.apache.impala.thrift.TImpalaTableType;
+import org.apache.impala.thrift.TDescribeTableParams;
 import org.apache.impala.thrift.TIcebergDmlFinalizeParams;
 import org.apache.impala.thrift.TIcebergOperation;
 import org.apache.impala.thrift.TExecRequest;
@@ -664,7 +667,8 @@ public class Frontend {
         columns.add(new TColumn("encoding", Type.STRING.toThrift()));
         columns.add(new TColumn("compression", Type.STRING.toThrift()));
         columns.add(new TColumn("block_size", Type.STRING.toThrift()));
-      } else if (descStmt.getTable() instanceof FeIcebergTable
+      } else if ((descStmt.getTable() instanceof FeIcebergTable
+          || descStmt.getTable() instanceof IcebergMetadataTable)
           && descStmt.getOutputStyle() == TDescribeOutputStyle.MINIMAL) {
         columns.add(new TColumn("nullable", Type.STRING.toThrift()));
       }
@@ -1656,24 +1660,34 @@ public class Frontend {
    * Throws an exception if the table or db is not found or if there is an error loading
    * the table metadata.
    */
-  public TDescribeResult describeTable(TTableName tableName,
-      TDescribeOutputStyle outputStyle, User user) throws ImpalaException {
-    RetryTracker retries = new RetryTracker(
-        String.format("fetching table %s.%s", tableName.db_name, tableName.table_name));
-    while (true) {
-      try {
-        return doDescribeTable(tableName, outputStyle, user);
-      } catch(InconsistentMetadataFetchException e) {
-        retries.handleRetryOrThrow(e);
+  public TDescribeResult describeTable(TDescribeTableParams params, User user)
+      throws ImpalaException {
+    if (params.isSetTable_name()) {
+      RetryTracker retries = new RetryTracker(
+          String.format("fetching table %s.%s", params.table_name.db_name,
+              params.table_name.table_name));
+      while (true) {
+        try {
+          return doDescribeTable(params.table_name, params.output_style, user,
+              params.metadata_table_name);
+        } catch(InconsistentMetadataFetchException e) {
+          retries.handleRetryOrThrow(e);
+        }
       }
+    } else {
+      Preconditions.checkState(params.output_style == TDescribeOutputStyle.MINIMAL);
+      Preconditions.checkNotNull(params.result_struct);
+      StructType structType = (StructType)Type.fromThrift(params.result_struct);
+      return DescribeResultFactory.buildDescribeMinimalResult(structType);
     }
   }
 
-  private TDescribeResult doDescribeTable(TTableName tableName,
-      TDescribeOutputStyle outputStyle, User user) throws ImpalaException {
-    FeTable table = getCatalog().getTable(tableName.db_name,
-        tableName.table_name);
-    List<Column> filteredColumns;
+  /**
+   * Filters out columns that the user is not authorized to see.
+   */
+  private List<Column> filterAuthorizedColumnsForDescribeTable(FeTable table, User user)
+      throws InternalException {
+    List<Column> authFilteredColumns;
     if (authzFactory_.getAuthorizationConfig().isEnabled()) {
       // First run a table check
       PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder(
@@ -1681,7 +1695,7 @@ public class Frontend {
           .allOf(Privilege.VIEW_METADATA).onTable(table).build();
       if (!authzChecker_.get().hasAccess(user, privilegeRequest)) {
         // Filter out columns that the user is not authorized to see.
-        filteredColumns = new ArrayList<Column>();
+        authFilteredColumns = new ArrayList<Column>();
         for (Column col: table.getColumnsInHiveOrder()) {
           String colName = col.getName();
           privilegeRequest = new PrivilegeRequestBuilder(
@@ -1690,22 +1704,37 @@ public class Frontend {
               .onColumn(table.getDb().getName(),
                   table.getName(), colName, table.getOwnerUser()).build();
           if (authzChecker_.get().hasAccess(user, privilegeRequest)) {
-            filteredColumns.add(col);
+            authFilteredColumns.add(col);
           }
         }
       } else {
         // User has table-level access
-        filteredColumns = table.getColumnsInHiveOrder();
+        authFilteredColumns = table.getColumnsInHiveOrder();
       }
     } else {
       // Authorization is disabled
-      filteredColumns = table.getColumnsInHiveOrder();
+      authFilteredColumns = table.getColumnsInHiveOrder();
     }
+    return authFilteredColumns;
+  }
+
+  private TDescribeResult doDescribeTable(TTableName tableName,
+      TDescribeOutputStyle outputStyle, User user, String vTableName)
+      throws ImpalaException {
+    FeTable table = getCatalog().getTable(tableName.db_name,
+        tableName.table_name);
+    List<Column> filteredColumns = filterAuthorizedColumnsForDescribeTable(table, user);
     if (outputStyle == TDescribeOutputStyle.MINIMAL) {
       if (table instanceof FeKuduTable) {
         return DescribeResultFactory.buildKuduDescribeMinimalResult(filteredColumns);
       } else if (table instanceof FeIcebergTable) {
-        return DescribeResultFactory.buildIcebergDescribeMinimalResult(filteredColumns);
+        if (vTableName == null) {
+          return DescribeResultFactory.buildIcebergDescribeMinimalResult(filteredColumns);
+        } else {
+          Preconditions.checkArgument(vTableName != null);
+          return DescribeResultFactory.buildIcebergMetadataDescribeMinimalResult(table,
+              vTableName);
+        }
       } else {
         return DescribeResultFactory.buildDescribeMinimalResult(
             Column.columnsToStruct(filteredColumns));
@@ -1713,6 +1742,7 @@ public class Frontend {
     } else {
       Preconditions.checkArgument(outputStyle == TDescribeOutputStyle.FORMATTED ||
           outputStyle == TDescribeOutputStyle.EXTENDED);
+      Preconditions.checkArgument(vTableName == null);
       TDescribeResult result = DescribeResultFactory.buildDescribeFormattedResult(table,
           filteredColumns);
       // Filter out LOCATION text
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 1f2213e15..4c6d05f7c 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -495,18 +495,9 @@ public class JniFrontend {
     Preconditions.checkNotNull(frontend_);
     TDescribeTableParams params = new TDescribeTableParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftDescribeTableParams);
-
     Preconditions.checkState(params.isSetTable_name() ^ params.isSetResult_struct());
     User user = new User(TSessionStateUtil.getEffectiveUser(params.getSession()));
-    TDescribeResult result = null;
-    if (params.isSetTable_name()) {
-      result = frontend_.describeTable(params.getTable_name(), params.output_style, user);
-    } else {
-      Preconditions.checkState(params.output_style == TDescribeOutputStyle.MINIMAL);
-      StructType structType = (StructType)Type.fromThrift(params.result_struct);
-      result = DescribeResultFactory.buildDescribeMinimalResult(structType);
-    }
-
+    TDescribeResult result = frontend_.describeTable(params, user);
     try {
       TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
index b9b34c6c0..3e551227a 100644
--- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
+++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
@@ -38,6 +38,7 @@ import org.apache.impala.testutil.ImpaladTestCatalog;
 import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TDescribeOutputStyle;
 import org.apache.impala.thrift.TDescribeResult;
+import org.apache.impala.thrift.TDescribeTableParams;
 import org.apache.impala.thrift.TFunctionBinaryType;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TPrivilegeLevel;
@@ -272,8 +273,11 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
       Preconditions.checkArgument(includedStrings_.length != 0 ||
               excludedStrings_.length != 0,
           "One or both of included or excluded strings must be defined.");
-      List<String> result = resultToStringList(authzFrontend_.describeTable(table,
-          outputStyle_, user_));
+      TDescribeTableParams testParams = new TDescribeTableParams();
+      testParams.setTable_name(table);
+      testParams.setOutput_style(outputStyle_);
+      List<String> result = resultToStringList(authzFrontend_.describeTable(testParams,
+          user_));
       for (String str: includedStrings_) {
         assertTrue(String.format("\"%s\" is not in the describe output.\n" +
                 "Expected : %s\n Actual   : %s", str, Arrays.toString(includedStrings_),
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
index c34f3aacc..0f58dae99 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
@@ -420,9 +420,9 @@ INT,STRING,BIGINT
 ####
 ====
 ---- QUERY
-describe functional_parquet.iceberg_query_metadata.snapshots;
+describe formatted functional_parquet.iceberg_query_metadata.snapshots;
 ---- CATCH
-AnalysisException: Could not resolve path: 'functional_parquet.iceberg_query_metadata.snapshots'
+AnalysisException: DESCRIBE FORMATTED|EXTENDED cannot refer to a metadata table.
 ====
 ---- QUERY
 show create table functional_parquet.iceberg_query_metadata.snapshots;
@@ -456,7 +456,7 @@ ParseException: Syntax error in line 1
 ====
 
 ####
-# Test 9 : Query STRUCT type columns
+# Test 9 : Query nested type columns
 ####
 ====
 ---- QUERY
@@ -550,4 +550,264 @@ as select equality_ids from functional_parquet.iceberg_query_metadata.all_files;
 select item from iceberg_query_metadata_all_files a, a.equality_ids e, e.delete_ids;
 ---- CATCH
 AnalysisException: Querying collection types (ARRAY/MAP) is not supported for Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)
-====
\ No newline at end of file
+====
+
+####
+# Test 10 : Describe all the metadata tables once
+####
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.snapshots;
+---- RESULTS
+'committed_at','timestamp','','false'
+'snapshot_id','bigint','','false'
+'parent_id','bigint','','true'
+'operation','string','','true'
+'manifest_list','string','','true'
+'summary','map<string,string>','','true'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.`files`;
+---- RESULTS
+'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality deletes','true'
+'file_path','string','Location URI with FS scheme','false'
+'file_format','string','File format name: avro, orc, or parquet','false'
+'spec_id','int','Partition spec ID','true'
+'record_count','bigint','Number of records in the file','false'
+'file_size_in_bytes','bigint','Total file size in bytes','false'
+'column_sizes','map<int,bigint>','Map of column id to total size on disk','true'
+'value_counts','map<int,bigint>','Map of column id to total count, including null and NaN','true'
+'null_value_counts','map<int,bigint>','Map of column id to null value count','true'
+'nan_value_counts','map<int,bigint>','Map of column id to number of NaN values in the column','true'
+'lower_bounds','map<int,binary>','Map of column id to lower bound','true'
+'upper_bounds','map<int,binary>','Map of column id to upper bound','true'
+'key_metadata','binary','Encryption key metadata blob','true'
+'split_offsets','array<bigint>','Splittable offsets','true'
+'equality_ids','array<int>','Equality comparison field IDs','true'
+'sort_order_id','int','Sort order ID','true'
+'readable_metrics','struct<\n  i:struct<\n    column_size:bigint comment ''total size on disk'',\n    value_count:bigint comment ''total count, including null and nan'',\n    null_value_count:bigint comment ''null value count'',\n    nan_value_count:bigint comment ''nan value count'',\n    lower_bound:int comment ''lower bound'',\n    upper_bound:int comment ''upper bound''\n  > comment ''metrics for column i''\n>','Column metrics in readable form','true'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.data_files;
+---- RESULTS
+'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality deletes','true'
+'file_path','string','Location URI with FS scheme','false'
+'file_format','string','File format name: avro, orc, or parquet','false'
+'spec_id','int','Partition spec ID','true'
+'record_count','bigint','Number of records in the file','false'
+'file_size_in_bytes','bigint','Total file size in bytes','false'
+'column_sizes','map<int,bigint>','Map of column id to total size on disk','true'
+'value_counts','map<int,bigint>','Map of column id to total count, including null and NaN','true'
+'null_value_counts','map<int,bigint>','Map of column id to null value count','true'
+'nan_value_counts','map<int,bigint>','Map of column id to number of NaN values in the column','true'
+'lower_bounds','map<int,binary>','Map of column id to lower bound','true'
+'upper_bounds','map<int,binary>','Map of column id to upper bound','true'
+'key_metadata','binary','Encryption key metadata blob','true'
+'split_offsets','array<bigint>','Splittable offsets','true'
+'equality_ids','array<int>','Equality comparison field IDs','true'
+'sort_order_id','int','Sort order ID','true'
+'readable_metrics','struct<\n  i:struct<\n    column_size:bigint comment ''total size on disk'',\n    value_count:bigint comment ''total count, including null and nan'',\n    null_value_count:bigint comment ''null value count'',\n    nan_value_count:bigint comment ''nan value count'',\n    lower_bound:int comment ''lower bound'',\n    upper_bound:int comment ''upper bound''\n  > comment ''metrics for column i''\n>','Column metrics in readable form','true'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.delete_files;
+---- RESULTS
+'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality deletes','true'
+'file_path','string','Location URI with FS scheme','false'
+'file_format','string','File format name: avro, orc, or parquet','false'
+'spec_id','int','Partition spec ID','true'
+'record_count','bigint','Number of records in the file','false'
+'file_size_in_bytes','bigint','Total file size in bytes','false'
+'column_sizes','map<int,bigint>','Map of column id to total size on disk','true'
+'value_counts','map<int,bigint>','Map of column id to total count, including null and NaN','true'
+'null_value_counts','map<int,bigint>','Map of column id to null value count','true'
+'nan_value_counts','map<int,bigint>','Map of column id to number of NaN values in the column','true'
+'lower_bounds','map<int,binary>','Map of column id to lower bound','true'
+'upper_bounds','map<int,binary>','Map of column id to upper bound','true'
+'key_metadata','binary','Encryption key metadata blob','true'
+'split_offsets','array<bigint>','Splittable offsets','true'
+'equality_ids','array<int>','Equality comparison field IDs','true'
+'sort_order_id','int','Sort order ID','true'
+'readable_metrics','struct<\n  i:struct<\n    column_size:bigint comment ''total size on disk'',\n    value_count:bigint comment ''total count, including null and nan'',\n    null_value_count:bigint comment ''null value count'',\n    nan_value_count:bigint comment ''nan value count'',\n    lower_bound:int comment ''lower bound'',\n    upper_bound:int comment ''upper bound''\n  > comment ''metrics for column i''\n>','Column metrics in readable form','true'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.history;
+---- RESULTS
+'made_current_at','timestamp','','false'
+'snapshot_id','bigint','','false'
+'parent_id','bigint','','true'
+'is_current_ancestor','boolean','','false'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.metadata_log_entries;
+---- RESULTS
+'timestamp','timestamp','','false'
+'file','string','','false'
+'latest_snapshot_id','bigint','','true'
+'latest_schema_id','int','','true'
+'latest_sequence_number','bigint','','true'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.snapshots;
+---- RESULTS
+'committed_at','timestamp','','false'
+'snapshot_id','bigint','','false'
+'parent_id','bigint','','true'
+'operation','string','','true'
+'manifest_list','string','','true'
+'summary','map<string,string>','','true'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.refs;
+---- RESULTS
+'name','string','','false'
+'type','string','','false'
+'snapshot_id','bigint','','false'
+'max_reference_age_in_ms','bigint','','true'
+'min_snapshots_to_keep','int','','true'
+'max_snapshot_age_in_ms','bigint','','true'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.manifests;
+---- RESULTS
+'content','int','','false'
+'path','string','','false'
+'length','bigint','','false'
+'partition_spec_id','int','','false'
+'added_snapshot_id','bigint','','false'
+'added_data_files_count','int','','false'
+'existing_data_files_count','int','','false'
+'deleted_data_files_count','int','','false'
+'added_delete_files_count','int','','false'
+'existing_delete_files_count','int','','false'
+'deleted_delete_files_count','int','','false'
+'partition_summaries','array<struct<\n  contains_null:boolean,\n  contains_nan:boolean,\n  lower_bound:string,\n  upper_bound:string\n>>','','false'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.`partitions`;
+---- RESULTS
+'record_count','bigint','Count of records in data files','false'
+'file_count','int','Count of data files','false'
+'position_delete_record_count','bigint','Count of records in position delete files','false'
+'position_delete_file_count','int','Count of position delete files','false'
+'equality_delete_record_count','bigint','Count of records in equality delete files','false'
+'equality_delete_file_count','int','Count of equality delete files','false'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.all_data_files;
+---- RESULTS
+'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality deletes','true'
+'file_path','string','Location URI with FS scheme','false'
+'file_format','string','File format name: avro, orc, or parquet','false'
+'spec_id','int','Partition spec ID','true'
+'record_count','bigint','Number of records in the file','false'
+'file_size_in_bytes','bigint','Total file size in bytes','false'
+'column_sizes','map<int,bigint>','Map of column id to total size on disk','true'
+'value_counts','map<int,bigint>','Map of column id to total count, including null and NaN','true'
+'null_value_counts','map<int,bigint>','Map of column id to null value count','true'
+'nan_value_counts','map<int,bigint>','Map of column id to number of NaN values in the column','true'
+'lower_bounds','map<int,binary>','Map of column id to lower bound','true'
+'upper_bounds','map<int,binary>','Map of column id to upper bound','true'
+'key_metadata','binary','Encryption key metadata blob','true'
+'split_offsets','array<bigint>','Splittable offsets','true'
+'equality_ids','array<int>','Equality comparison field IDs','true'
+'sort_order_id','int','Sort order ID','true'
+'readable_metrics','struct<\n  i:struct<\n    column_size:bigint comment ''total size on disk'',\n    value_count:bigint comment ''total count, including null and nan'',\n    null_value_count:bigint comment ''null value count'',\n    nan_value_count:bigint comment ''nan value count'',\n    lower_bound:int comment ''lower bound'',\n    upper_bound:int comment ''upper bound''\n  > comment ''metrics for column i''\n>','Column metrics in readable form','true'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.all_delete_files;
+---- RESULTS
+'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality deletes','true'
+'file_path','string','Location URI with FS scheme','false'
+'file_format','string','File format name: avro, orc, or parquet','false'
+'spec_id','int','Partition spec ID','true'
+'record_count','bigint','Number of records in the file','false'
+'file_size_in_bytes','bigint','Total file size in bytes','false'
+'column_sizes','map<int,bigint>','Map of column id to total size on disk','true'
+'value_counts','map<int,bigint>','Map of column id to total count, including null and NaN','true'
+'null_value_counts','map<int,bigint>','Map of column id to null value count','true'
+'nan_value_counts','map<int,bigint>','Map of column id to number of NaN values in the column','true'
+'lower_bounds','map<int,binary>','Map of column id to lower bound','true'
+'upper_bounds','map<int,binary>','Map of column id to upper bound','true'
+'key_metadata','binary','Encryption key metadata blob','true'
+'split_offsets','array<bigint>','Splittable offsets','true'
+'equality_ids','array<int>','Equality comparison field IDs','true'
+'sort_order_id','int','Sort order ID','true'
+'readable_metrics','struct<\n  i:struct<\n    column_size:bigint comment ''total size on disk'',\n    value_count:bigint comment ''total count, including null and nan'',\n    null_value_count:bigint comment ''null value count'',\n    nan_value_count:bigint comment ''nan value count'',\n    lower_bound:int comment ''lower bound'',\n    upper_bound:int comment ''upper bound''\n  > comment ''metrics for column i''\n>','Column metrics in readable form','true'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.all_files;
+---- RESULTS
+'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality deletes','true'
+'file_path','string','Location URI with FS scheme','false'
+'file_format','string','File format name: avro, orc, or parquet','false'
+'spec_id','int','Partition spec ID','true'
+'record_count','bigint','Number of records in the file','false'
+'file_size_in_bytes','bigint','Total file size in bytes','false'
+'column_sizes','map<int,bigint>','Map of column id to total size on disk','true'
+'value_counts','map<int,bigint>','Map of column id to total count, including null and NaN','true'
+'null_value_counts','map<int,bigint>','Map of column id to null value count','true'
+'nan_value_counts','map<int,bigint>','Map of column id to number of NaN values in the column','true'
+'lower_bounds','map<int,binary>','Map of column id to lower bound','true'
+'upper_bounds','map<int,binary>','Map of column id to upper bound','true'
+'key_metadata','binary','Encryption key metadata blob','true'
+'split_offsets','array<bigint>','Splittable offsets','true'
+'equality_ids','array<int>','Equality comparison field IDs','true'
+'sort_order_id','int','Sort order ID','true'
+'readable_metrics','struct<\n  i:struct<\n    column_size:bigint comment ''total size on disk'',\n    value_count:bigint comment ''total count, including null and nan'',\n    null_value_count:bigint comment ''null value count'',\n    nan_value_count:bigint comment ''nan value count'',\n    lower_bound:int comment ''lower bound'',\n    upper_bound:int comment ''upper bound''\n  > comment ''metrics for column i''\n>','Column metrics in readable form','true'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.all_manifests;
+---- RESULTS
+'content','int','','false'
+'path','string','','false'
+'length','bigint','','false'
+'partition_spec_id','int','','true'
+'added_snapshot_id','bigint','','true'
+'added_data_files_count','int','','true'
+'existing_data_files_count','int','','true'
+'deleted_data_files_count','int','','true'
+'added_delete_files_count','int','','false'
+'existing_delete_files_count','int','','false'
+'deleted_delete_files_count','int','','false'
+'partition_summaries','array<struct<\n  contains_null:boolean,\n  contains_nan:boolean,\n  lower_bound:string,\n  upper_bound:string\n>>','','true'
+'reference_snapshot_id','bigint','','false'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+describe functional_parquet.iceberg_query_metadata.all_entries;
+---- RESULTS
+'status','int','','false'
+'snapshot_id','bigint','','true'
+'sequence_number','bigint','','true'
+'file_sequence_number','bigint','','true'
+'data_file','struct<\n  content:int comment ''contents of the file: 0=data, 1=position deletes, 2=equality deletes'',\n  file_path:string comment ''location uri with fs scheme'',\n  file_format:string comment ''file format name: avro, orc, or parquet'',\n  spec_id:int comment ''partition spec id'',\n  record_count:bigint comment ''number of records in the file'',\n  file_size_in_bytes:bigint comment ''total file size in bytes'',\n  column_sizes:map<int,bigint> comment ''map of column id  [...]
+'readable_metrics','struct<\n  i:struct<\n    column_size:bigint comment ''total size on disk'',\n    value_count:bigint comment ''total count, including null and nan'',\n    null_value_count:bigint comment ''null value count'',\n    nan_value_count:bigint comment ''nan value count'',\n    lower_bound:int comment ''lower bound'',\n    upper_bound:int comment ''upper bound''\n  > comment ''metrics for column i''\n>','Column metrics in readable form','true'
+---- TYPES
+STRING,STRING,STRING,STRING
+====


(impala) 04/05: IMPALA-12687: Fix key conflicts in tracking in-flight catalog operations

Posted by wz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2d5307418b57efd16196cf0538095339d3b8d96d
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Wed Jan 10 13:58:38 2024 +0800

    IMPALA-12687: Fix key conflicts in tracking in-flight catalog operations
    
    In-flight catalog operations are tracked in a map using query id as the
    key. It's ok since catalog clients use 0 as the timeout by default (see
    --catalog_client_rpc_timeout_ms), i.e. catalog RPCs never timeout, which
    means each query will have at most one in-flight catalog RPC at a time.
    
    However, in case catalog_client_rpc_timeout_ms is set to non-zero,
    impalad could retry the catalog RPC when it's considered timed out. That
    causes several in-flight catalog operations coming from the same query
    (so using the same query-id as the map key).
    
    To fix the key conflicts, this patch use the pair of (queryId, threadId)
    as the key of the in-flight operations map. 'threadId' comes from the
    thrift thread that handles the RPC so it's unique across different
    retries.
    
    Tests:
     - Add custom-cluster test to verify all retries are shown in the
       /operations page.
    
    Change-Id: Icd94ac7532fe7f3d68028c2da82298037be706c4
    Reviewed-on: http://gerrit.cloudera.org:8080/20877
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../catalog/monitor/CatalogOperationTracker.java   | 115 ++++++++++++++-------
 tests/custom_cluster/test_web_pages.py             |  70 +++++++++++--
 2 files changed, 136 insertions(+), 49 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
index 47dc8d18f..f66f0cfd7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
+++ b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
@@ -29,6 +29,9 @@ import org.apache.impala.thrift.TResetMetadataRequest;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
+import org.apache.impala.util.TUniqueIdUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -48,29 +51,59 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  *  are also kept in memory and the size is controlled by 'catalog_operation_log_size'.
  */
 public final class CatalogOperationTracker {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CatalogOperationTracker.class);
   public final static CatalogOperationTracker INSTANCE = new CatalogOperationTracker();
 
   // Keeps track of the on-going DDL operations
-  CatalogDdlCounter catalogDdlCounter;
+  CatalogDdlCounter catalogDdlCounter_;
 
   // Keeps track of the on-going reset metadata requests (refresh/invalidate)
-  CatalogResetMetadataCounter catalogResetMetadataCounter;
+  CatalogResetMetadataCounter catalogResetMetadataCounter_;
 
   // Keeps track of the on-going finalize DML requests (insert/CTAS/upgrade)
-  CatalogFinalizeDmlCounter catalogFinalizeDmlCounter;
+  CatalogFinalizeDmlCounter catalogFinalizeDmlCounter_;
 
-  private final Map<TUniqueId, TCatalogOpRecord> inFlightOperations =
+  /**
+   * Key to track in-flight catalog operations. Each operation is triggered by an RPC.
+   * Each RPC is identified by the query id and the thrift thread id that handles it.
+   * Note that the thread id is important to identify different RPC retries.
+   */
+  private static class RpcKey {
+    private final TUniqueId queryId_;
+    private final long threadId_;
+
+    public RpcKey(TUniqueId queryId) {
+      queryId_ = queryId;
+      threadId_ = Thread.currentThread().getId();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof RpcKey)) return false;
+      RpcKey key = (RpcKey) o;
+      return queryId_.equals(key.queryId_) && threadId_ == key.threadId_;
+    }
+
+    @Override
+    public int hashCode() {
+      return queryId_.hashCode() * 31 + Long.hashCode(threadId_);
+    }
+  }
+
+  private final Map<RpcKey, TCatalogOpRecord> inFlightOperations_ =
       new ConcurrentHashMap<>();
-  private final Queue<TCatalogOpRecord> finishedOperations =
+  private final Queue<TCatalogOpRecord> finishedOperations_ =
       new ConcurrentLinkedQueue<>();
-  private final int catalogOperationLogSize;
+  private final int catalogOperationLogSize_;
 
   private CatalogOperationTracker() {
-    catalogDdlCounter = new CatalogDdlCounter();
-    catalogResetMetadataCounter = new CatalogResetMetadataCounter();
-    catalogFinalizeDmlCounter = new CatalogFinalizeDmlCounter();
-    catalogOperationLogSize = BackendConfig.INSTANCE.catalogOperationLogSize();
-    Preconditions.checkState(catalogOperationLogSize >= 0);
+    catalogDdlCounter_ = new CatalogDdlCounter();
+    catalogResetMetadataCounter_ = new CatalogResetMetadataCounter();
+    catalogFinalizeDmlCounter_ = new CatalogFinalizeDmlCounter();
+    catalogOperationLogSize_ = BackendConfig.INSTANCE.catalogOperationLogSize();
+    Preconditions.checkState(catalogOperationLogSize_ >= 0);
   }
 
   private void addRecord(TCatalogServiceRequestHeader header,
@@ -91,29 +124,33 @@ public final class CatalogOperationTracker {
     if (queryId != null) {
       TCatalogOpRecord record = new TCatalogOpRecord(Thread.currentThread().getId(),
           queryId, clientIp, coordinator, catalogOpName,
-          catalogDdlCounter.getTableName(tTableName), user,
+          catalogDdlCounter_.getTableName(tTableName), user,
           System.currentTimeMillis(), -1, "STARTED", details);
-      inFlightOperations.put(queryId, record);
+      inFlightOperations_.put(new RpcKey(queryId), record);
     }
   }
 
   private void archiveRecord(TUniqueId queryId, String errorMsg) {
-    if (queryId != null && inFlightOperations.containsKey(queryId)) {
-      TCatalogOpRecord record = inFlightOperations.remove(queryId);
-      if (catalogOperationLogSize == 0) return;
-      record.setFinish_time_ms(System.currentTimeMillis());
-      if (errorMsg != null) {
-        record.setStatus("FAILED");
-        record.setDetails(record.getDetails() + ", error=" + errorMsg);
-      } else {
-        record.setStatus("FINISHED");
-      }
-      synchronized (finishedOperations) {
-        if (finishedOperations.size() >= catalogOperationLogSize) {
-          finishedOperations.poll();
-        }
-        finishedOperations.add(record);
+    if (queryId == null) return;
+    RpcKey key = new RpcKey(queryId);
+    TCatalogOpRecord record = inFlightOperations_.remove(key);
+    if (record == null) {
+      LOG.error("Null record for query {}", TUniqueIdUtil.PrintId(queryId));
+      return;
+    }
+    if (catalogOperationLogSize_ == 0) return;
+    record.setFinish_time_ms(System.currentTimeMillis());
+    if (errorMsg != null) {
+      record.setStatus("FAILED");
+      record.setDetails(record.getDetails() + ", error=" + errorMsg);
+    } else {
+      record.setStatus("FINISHED");
+    }
+    synchronized (finishedOperations_) {
+      if (finishedOperations_.size() >= catalogOperationLogSize_) {
+        finishedOperations_.poll();
       }
+      finishedOperations_.add(record);
     }
   }
 
@@ -129,13 +166,13 @@ public final class CatalogOperationTracker {
       String details = "query_options=" + ddlRequest.query_options.toString();
       addRecord(ddlRequest.getHeader(), getDdlType(ddlRequest), tTableName, details);
     }
-    catalogDdlCounter.incrementOperation(ddlRequest.ddl_type, tTableName);
+    catalogDdlCounter_.incrementOperation(ddlRequest.ddl_type, tTableName);
   }
 
   public void decrement(TDdlType tDdlType, TUniqueId queryId,
       Optional<TTableName> tTableName, String errorMsg) {
     archiveRecord(queryId, errorMsg);
-    catalogDdlCounter.decrementOperation(tDdlType, tTableName);
+    catalogDdlCounter_.decrementOperation(tDdlType, tTableName);
   }
 
   public void increment(TResetMetadataRequest req) {
@@ -152,14 +189,14 @@ public final class CatalogOperationTracker {
           CatalogResetMetadataCounter.getResetMetadataType(req, tTableName).name(),
           tTableName, details);
     }
-    catalogResetMetadataCounter.incrementOperation(req);
+    catalogResetMetadataCounter_.incrementOperation(req);
   }
 
   public void decrement(TResetMetadataRequest req, String errorMsg) {
     if (req.isSetHeader()) {
       archiveRecord(req.getHeader().getQuery_id(), errorMsg);
     }
-    catalogResetMetadataCounter.decrementOperation(req);
+    catalogResetMetadataCounter_.decrementOperation(req);
   }
 
   public void increment(TUpdateCatalogRequest req) {
@@ -181,14 +218,14 @@ public final class CatalogOperationTracker {
           CatalogFinalizeDmlCounter.getDmlType(req.getHeader().redacted_sql_stmt).name(),
           tTableName, details);
     }
-    catalogFinalizeDmlCounter.incrementOperation(req);
+    catalogFinalizeDmlCounter_.incrementOperation(req);
   }
 
   public void decrement(TUpdateCatalogRequest req, String errorMsg) {
     if (req.isSetHeader()) {
       archiveRecord(req.getHeader().getQuery_id(), errorMsg);
     }
-    catalogFinalizeDmlCounter.decrementOperation(req);
+    catalogFinalizeDmlCounter_.decrementOperation(req);
   }
 
   /**
@@ -197,14 +234,14 @@ public final class CatalogOperationTracker {
    */
   public TGetOperationUsageResponse getOperationMetrics() {
     List<TOperationUsageCounter> merged = new ArrayList<>();
-    merged.addAll(catalogDdlCounter.getOperationUsage());
-    merged.addAll(catalogResetMetadataCounter.getOperationUsage());
-    merged.addAll(catalogFinalizeDmlCounter.getOperationUsage());
+    merged.addAll(catalogDdlCounter_.getOperationUsage());
+    merged.addAll(catalogResetMetadataCounter_.getOperationUsage());
+    merged.addAll(catalogFinalizeDmlCounter_.getOperationUsage());
     TGetOperationUsageResponse res = new TGetOperationUsageResponse(merged);
-    for (TCatalogOpRecord record : inFlightOperations.values()) {
+    for (TCatalogOpRecord record : inFlightOperations_.values()) {
       res.addToIn_flight_catalog_operations(record);
     }
-    List<TCatalogOpRecord> records = new ArrayList<>(finishedOperations);
+    List<TCatalogOpRecord> records = new ArrayList<>(finishedOperations_);
     // Reverse the list to show recent operations first.
     Collections.reverse(records);
     res.setFinished_catalog_operations(records);
diff --git a/tests/custom_cluster/test_web_pages.py b/tests/custom_cluster/test_web_pages.py
index 2644f70a2..2c5b87015 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -22,7 +22,9 @@ import re
 import requests
 import psutil
 import pytest
+import time
 
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import (
   DEFAULT_CLUSTER_SIZE,
   CustomClusterTestSuite)
@@ -260,6 +262,22 @@ class TestWebPage(CustomClusterTestSuite):
       assert 'Content-Security-Policy' not in response.headers, \
         "CSP header present despite being disabled (port %s)" % port
 
+  @staticmethod
+  def _get_inflight_catalog_operations():
+    response = requests.get("http://localhost:25020/operations?json")
+    assert response.status_code == requests.codes.ok
+    operations = json.loads(response.text)
+    assert "inflight_catalog_operations" in operations
+    return operations["inflight_catalog_operations"]
+
+  @staticmethod
+  def _get_finished_catalog_operations():
+    response = requests.get("http://localhost:25020/operations?json")
+    assert response.status_code == requests.codes.ok
+    operations = json.loads(response.text)
+    assert "finished_catalog_operations" in operations
+    return operations["finished_catalog_operations"]
+
   @CustomClusterTestSuite.with_args(catalogd_args="--catalog_operation_log_size=2")
   def test_catalog_operations_limit(self, unique_database):
     tbl = unique_database + ".tbl"
@@ -267,11 +285,7 @@ class TestWebPage(CustomClusterTestSuite):
     self.execute_query("create table {0}_2 (id int)".format(tbl))
     self.execute_query("create table {0}_3 (id int)".format(tbl))
     self.execute_query("drop table {0}_1".format(tbl))
-    response = requests.get("http://localhost:25020/operations?json")
-    assert response.status_code == requests.codes.ok
-    operations = json.loads(response.text)
-    assert "finished_catalog_operations" in operations
-    finished_operations = operations["finished_catalog_operations"]
+    finished_operations = self._get_finished_catalog_operations()
     # Verify only 2 operations are shown
     assert len(finished_operations) == 2
     op = finished_operations[0]
@@ -293,11 +307,7 @@ class TestWebPage(CustomClusterTestSuite):
     num = 500
     for i in range(num):
       self.execute_query("invalidate metadata " + tbl)
-    response = requests.get("http://localhost:25020/operations?json")
-    assert response.status_code == requests.codes.ok
-    operations = json.loads(response.text)
-    assert "finished_catalog_operations" in operations
-    finished_operations = operations["finished_catalog_operations"]
+    finished_operations = self._get_finished_catalog_operations()
     # Verify all operations are in the history. There are one DROP_DATABASE, one
     # CREATE_DATABASE, one CREATE_TABLE and 'num' INVALIDATEs in the list.
     assert len(finished_operations) == 3 + num
@@ -319,6 +329,46 @@ class TestWebPage(CustomClusterTestSuite):
     assert op["catalog_op_name"] == "DROP_DATABASE"
     assert op["target_name"] == unique_database
 
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--catalog_client_rpc_timeout_ms=10 "
+                 "--catalog_client_rpc_retry_interval_ms=10 "
+                 "--catalog_client_connection_num_retries=2")
+  def test_catalog_operations_with_rpc_retry(self):
+    """Test that catalog RPC retries are all shown in the /operations page"""
+    # Run a DESCRIBE to ensure the table is loaded. So the first RPC attempt will
+    # time out in its real work.
+    self.execute_query("describe functional.alltypes")
+    try:
+      self.execute_query("refresh functional.alltypes", {
+        "debug_action": "catalogd_refresh_hdfs_listing_delay:SLEEP@30"
+      })
+    except ImpalaBeeswaxException as e:
+      assert "RPC recv timed out" in str(e)
+    # In impalad side, the query fails by the above error. However, in catalogd side,
+    # the RPCs are still running. Check the in-flight operations.
+    inflight_operations = self._get_inflight_catalog_operations()
+    assert len(inflight_operations) == 2
+    for op in inflight_operations:
+      assert op["status"] == "STARTED"
+      assert op["catalog_op_name"] == "REFRESH"
+      assert op["target_name"] == "functional.alltypes"
+    assert inflight_operations[0]["query_id"] == inflight_operations[1]["query_id"]
+    assert inflight_operations[0]["thread_id"] != inflight_operations[1]["thread_id"]
+
+    # Wait until the catalog operations finish
+    while len(self._get_inflight_catalog_operations()) != 0:
+      time.sleep(1)
+
+    # Verify both RPC attempts are shown as finished operations.
+    finished_operations = self._get_finished_catalog_operations()
+    assert len(finished_operations) == 2
+    for op in finished_operations:
+      assert op["status"] == "FINISHED"
+      assert op["catalog_op_name"] == "REFRESH"
+      assert op["target_name"] == "functional.alltypes"
+    assert finished_operations[0]["query_id"] == finished_operations[1]["query_id"]
+    assert finished_operations[0]["thread_id"] != finished_operations[1]["thread_id"]
+
   def _verify_topic_size_metrics(self):
     # Calculate the total topic metrics from the /topics page
     response = requests.get("http://localhost:25010/topics?json")