You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2024/01/19 06:39:57 UTC

(impala) branch master updated (b372f87b6 -> 5dfcdf1c9)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from b372f87b6 IMPALA-12708: An UPDATE creates 2 new snapshots in Iceberg tables
     new 526d6eb68 IMPALA-12719: Reload filemetadata for AlterTable event of type truncate
     new 5dfcdf1c9 IMPALA-12716: Fix timeout thresholds in test_catalog_operations_with_rpc_retry

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../impala/catalog/events/MetastoreEvents.java     | 53 ++++++++++++++++----
 tests/custom_cluster/test_events_custom_configs.py | 58 ++++++++++++++++++++++
 tests/custom_cluster/test_web_pages.py             |  8 +--
 3 files changed, 106 insertions(+), 13 deletions(-)


(impala) 01/02: IMPALA-12719: Reload filemetadata for AlterTable event of type truncate

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 526d6eb68f202c5172d608f661979a40a92c2261
Author: Sai Hemanth Gantasala <sa...@cloudera.com>
AuthorDate: Thu Jan 11 14:01:31 2024 -0800

    IMPALA-12719: Reload filemetadata for AlterTable event of type truncate
    
    When the event processor receives an alter table event and the event
    type is of truncate operation currently file metadata is not reloaded.
    This patch addresses this issue, where the alter table event type is
    verified if it's a truncate operation and then reload the file metadata
    accordingly.
    
    Note: Alter table event for an external table generated by the truncate
    operation from Impala cannot be identified if it's a truncate op or not
    This becomes an issue in multi cluster Impala environments where events
    generated from one impala cluster is consumed by other impala clusters.
    Truncate operations in Impala on replicated tables will generated alter
    event with 'isTruncateOp' field set to true.
    
    Testing:
    Added an end-to-end test to verify whether file metadata is reloaded
    for the above scenario.
    
    Change-Id: I53bb80c294623eec7c79d9f30f410771386c6b75
    Reviewed-on: http://gerrit.cloudera.org:8080/20887
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/events/MetastoreEvents.java     | 53 ++++++++++++++++----
 tests/custom_cluster/test_events_custom_configs.py | 58 ++++++++++++++++++++++
 2 files changed, 101 insertions(+), 10 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 6dcd86420..887404d36 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1451,6 +1451,8 @@ public class MetastoreEvents {
     private final Boolean eventSyncAfterFlag_;
     // value of the db flag at the time of event creation
     private final boolean dbFlagVal;
+    // true if this alter event was due to a truncate operation in metastore
+    private final boolean isTruncateOp_;
 
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory instead
@@ -1467,6 +1469,7 @@ public class MetastoreEvents {
         msTbl_ = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
         tableAfter_ = Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
         tableBefore_ = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
+        isTruncateOp_ = alterTableMessage.getIsTruncateOp();
       } catch (Exception e) {
         throw new MetastoreNotificationException(
             debugString("Unable to parse the alter table message"), e);
@@ -1563,7 +1566,8 @@ public class MetastoreEvents {
             + "which can be ignored.");
         return;
       }
-      skipFileMetadataReload_ = canSkipFileMetadataReload(tableBefore_, tableAfter_);
+      skipFileMetadataReload_ = !isTruncateOp_ && canSkipFileMetadataReload(tableBefore_,
+          tableAfter_);
       long startNs = System.nanoTime();
       if (wasEventSyncTurnedOn()) {
         handleEventSyncTurnedOn();
@@ -1716,6 +1720,11 @@ public class MetastoreEvents {
       // event by setting those parameters equal before and after the event and
       // comparing the objects.
 
+      // alter table event from truncate ops always can't be skipped.
+      if (isTruncateOp_) {
+        return false;
+      }
+
       // Avoid modifying the object from event.
       org.apache.hadoop.hive.metastore.api.Table tblAfter = tableAfter_.deepCopy();
       setTrivialParameters(tableBefore_.getParameters(), tblAfter.getParameters());
@@ -2190,6 +2199,8 @@ public class MetastoreEvents {
     private final long versionNumberFromEvent_;
     // the service id from the partition parameters of the event.
     private final String serviceIdFromEvent_;
+    // true if this alter event was due to a truncate operation in metastore
+    private final boolean isTruncateOp_;
 
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory instead
@@ -2208,6 +2219,7 @@ public class MetastoreEvents {
             Preconditions.checkNotNull(alterPartitionMessage.getPtnObjBefore());
         partitionAfter_ =
             Preconditions.checkNotNull(alterPartitionMessage.getPtnObjAfter());
+        isTruncateOp_ = alterPartitionMessage.getIsTruncateOp();
         msTbl_ = alterPartitionMessage.getTableObj();
         Map<String, String> parameters = partitionAfter_.getParameters();
         versionNumberFromEvent_ = Long.parseLong(
@@ -2302,9 +2314,13 @@ public class MetastoreEvents {
             partitionAfter_);
         try {
           // load file metadata only if storage descriptor of partitionAfter_ differs
-          // from sd of HdfsPartition
-          reloadPartitions(Arrays.asList(partitionAfter_),
-              FileMetadataLoadOpts.LOAD_IF_SD_CHANGED, "ALTER_PARTITION event");
+          // from sd of HdfsPartition. If the alter_partition event type is of truncate
+          // then force load the file metadata.
+          FileMetadataLoadOpts fileMetadataLoadOpts =
+              isTruncateOp_ ? FileMetadataLoadOpts.FORCE_LOAD :
+                  FileMetadataLoadOpts.LOAD_IF_SD_CHANGED;
+          reloadPartitions(Arrays.asList(partitionAfter_), fileMetadataLoadOpts,
+              "ALTER_PARTITION event");
         } catch (CatalogException e) {
           throw new MetastoreNotificationNeedsInvalidateException(
               debugString("Refresh partition on table {} partition {} failed. Event " +
@@ -2323,6 +2339,11 @@ public class MetastoreEvents {
       // event by setting those parameters equal before and after the event and
       // comparing the objects.
 
+      // alter partition event from truncate ops always can't be skipped.
+      if (isTruncateOp_) {
+        return false;
+      }
+
       // Avoid modifying the object from event.
       Partition afterPartition = partitionAfter_.deepCopy();
       setTrivialParameters(partitionBefore_.getParameters(),
@@ -2426,17 +2447,22 @@ public class MetastoreEvents {
       // Ignore the event if this is a trivial event. See javadoc for
       // isTrivialAlterPartitionEvent() for examples.
       List<T> eventsToProcess = new ArrayList<>();
+      List<Partition> partitionEventsToForceReload = new ArrayList<>();
       for (T event : batchedEvents_) {
         if (isOlderEvent(event.getPartitionForBatching())) {
           infoLog("Not processing the current event id {} as it is an older event",
               event.getEventId());
           continue;
         }
-        if (!event.canBeSkipped()) {
+        boolean isTruncateOp = (event instanceof AlterPartitionEvent &&
+            ((AlterPartitionEvent)event).isTruncateOp_);
+        if (isTruncateOp) {
+          partitionEventsToForceReload.add(event.getPartitionForBatching());
+        } else if (!event.canBeSkipped()){
           eventsToProcess.add(event);
         }
       }
-      if (eventsToProcess.isEmpty()) {
+      if (eventsToProcess.isEmpty() && partitionEventsToForceReload.isEmpty()) {
         LOG.info(
             "Ignoring events from event id {} to {} since they modify parameters "
             + " which can be ignored", getFirstEventId(), getLastEventId());
@@ -2459,10 +2485,17 @@ public class MetastoreEvents {
             reloadPartitions(partitions, FileMetadataLoadOpts.FORCE_LOAD,
                 getEventType().toString() + " event");
           } else {
-            // alter partition event. Reload file metadata of only those partitions
-            // for which sd has changed
-            reloadPartitions(partitions, FileMetadataLoadOpts.LOAD_IF_SD_CHANGED,
-                getEventType().toString() + " event");
+            if (!partitionEventsToForceReload.isEmpty()) {
+              // force reload truncated partitions
+              reloadPartitions(partitionEventsToForceReload,
+                  FileMetadataLoadOpts.FORCE_LOAD, getEventType().toString() + " event");
+            }
+            if (!partitions.isEmpty()) {
+              // alter partition event. Reload file metadata of only those partitions
+              // for which sd has changed
+              reloadPartitions(partitions, FileMetadataLoadOpts.LOAD_IF_SD_CHANGED,
+                  getEventType().toString() + " event");
+            }
           }
         } catch (CatalogException e) {
           throw new MetastoreNotificationNeedsInvalidateException(String.format(
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index aaebde2b9..09ab84b58 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -1086,3 +1086,61 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
       assert parts_added_before == parts_added_after
       assert parts_refreshed_before == parts_refreshed_after
       assert events_skipped_after > events_skipped_before
+
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5")
+  def test_truncate_table_from_hive(self, unique_database):
+    """IMPALA-12636: verify truncate table from hive reloads file metadata in Impala"""
+    hive_tbl = "tbl_in_hive"
+    values = "values (10),(20),(30)"
+
+    def verify_truncate_op_in_hive(tbl_name, is_transactional, is_partitioned,
+        is_batched):
+      create_query = " ".join(["create", "table `{}`.`{}` (i int)",
+        " partitioned by (year int) " if is_partitioned else '',
+          self.__get_transactional_tblproperties(is_transactional)])
+      self.execute_query(create_query.format(unique_database, tbl_name))
+      insert_query = " ".join(["insert into `{}`.`{}`", "partition (year=2024)"
+        if is_partitioned else '', values])
+      self.run_stmt_in_hive(insert_query.format(unique_database, tbl_name))
+      EventProcessorUtils.wait_for_event_processing(self)
+      self.client.execute("refresh {}.{}".format(unique_database, tbl_name))
+      truncate_query = " ".join(["truncate table `{}`.`{}`", "partition (year=2024)"
+        if is_partitioned else ''])
+      self.run_stmt_in_hive(truncate_query.format(unique_database, tbl_name))
+      if is_batched:
+        self.run_stmt_in_hive(
+          "insert into {}.{} partition (year=2024) values (1),(2)"
+          .format(unique_database, tbl_name))
+      EventProcessorUtils.wait_for_event_processing(self)
+      data = int(self.execute_scalar("select count(*) from {0}.{1}".format(
+        unique_database, tbl_name)))
+      assert data == 2 if is_batched else data == 0
+      self.client.execute("drop table {}.{}".format(unique_database, tbl_name))
+    # Case-I: truncate single partition
+    verify_truncate_op_in_hive(hive_tbl, False, False, False)
+    verify_truncate_op_in_hive(hive_tbl, True, False, False)
+    verify_truncate_op_in_hive(hive_tbl, False, True, False)
+    verify_truncate_op_in_hive(hive_tbl, False, True, True)
+    verify_truncate_op_in_hive(hive_tbl, True, True, False)
+    verify_truncate_op_in_hive(hive_tbl, True, True, True)
+
+    # Case-II: truncate partition in multi partition
+    hive_tbl = "multi_part_tbl"
+    self.client.execute("create table {}.{} (i int) partitioned by "
+      "(p int, q int)".format(unique_database, hive_tbl))
+    self.client.execute("insert into {}.{} partition(p, q) values "
+      "(0,0,0), (0,0,1), (0,0,2)".format(unique_database, hive_tbl))
+    self.client.execute("insert into {}.{} partition(p, q) values "
+      "(0,1,0), (0,1,1)".format(unique_database, hive_tbl))
+    self.run_stmt_in_hive("truncate table {}.{} partition(p=0)"
+      .format(unique_database, hive_tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    data = int(self.execute_scalar("select count(*) from {0}.{1}".format(
+      unique_database, hive_tbl)))
+    assert data == 2
+    self.run_stmt_in_hive("truncate table {}.{}"
+      .format(unique_database, hive_tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    data = int(self.execute_scalar("select count(*) from {0}.{1}".format(
+      unique_database, hive_tbl)))
+    assert data == 0


(impala) 02/02: IMPALA-12716: Fix timeout thresholds in test_catalog_operations_with_rpc_retry

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5dfcdf1c9538b69e25b9bde1e86c209595b218b5
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Fri Jan 19 07:36:19 2024 +0800

    IMPALA-12716: Fix timeout thresholds in test_catalog_operations_with_rpc_retry
    
    test_catalog_operations_with_rpc_retry uses a short timeout which could
    lead to failures in the first DESCRIBE statement. What it expects is the
    next REFRESH statement failed by the RPC timeout.
    
    The DESCRIBE statement triggers a catalog RPC of PrioritizeLoad on the
    table. The RPC usually finishes in 40ms. This patch bumps the RPC
    timeout of the test to be 100ms so it's long enough for DESCRIBE to
    succeed. Also bumps the sleep time in the REFRESH statement so its
    ExecDdl RPC will always time out in 100ms.
    
    Tests:
     - Ran the test till night (2700 times) and all passed.
    
    Change-Id: Ibbfa79d7f7530af4cfbb6f8ebc55e8267e3d3261
    Reviewed-on: http://gerrit.cloudera.org:8080/20924
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/custom_cluster/test_web_pages.py | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/tests/custom_cluster/test_web_pages.py b/tests/custom_cluster/test_web_pages.py
index 2c5b87015..2bd40764a 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -330,17 +330,19 @@ class TestWebPage(CustomClusterTestSuite):
     assert op["target_name"] == unique_database
 
   @CustomClusterTestSuite.with_args(
-    impalad_args="--catalog_client_rpc_timeout_ms=10 "
+    impalad_args="--catalog_client_rpc_timeout_ms=100 "
                  "--catalog_client_rpc_retry_interval_ms=10 "
                  "--catalog_client_connection_num_retries=2")
   def test_catalog_operations_with_rpc_retry(self):
     """Test that catalog RPC retries are all shown in the /operations page"""
     # Run a DESCRIBE to ensure the table is loaded. So the first RPC attempt will
-    # time out in its real work.
+    # time out in its real work. This triggers a PrioritizeLoad RPC which usually
+    # finishes in 40ms. So 100ms for catalog RPC timeout is enough.
     self.execute_query("describe functional.alltypes")
     try:
+      # This runs around 600ms with the debug action so the catalog RPC will timeout.
       self.execute_query("refresh functional.alltypes", {
-        "debug_action": "catalogd_refresh_hdfs_listing_delay:SLEEP@30"
+        "debug_action": "catalogd_refresh_hdfs_listing_delay:SLEEP@100"
       })
     except ImpalaBeeswaxException as e:
       assert "RPC recv timed out" in str(e)