You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2024/01/19 06:39:58 UTC

(impala) 01/02: IMPALA-12719: Reload filemetadata for AlterTable event of type truncate

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 526d6eb68f202c5172d608f661979a40a92c2261
Author: Sai Hemanth Gantasala <sa...@cloudera.com>
AuthorDate: Thu Jan 11 14:01:31 2024 -0800

    IMPALA-12719: Reload filemetadata for AlterTable event of type truncate
    
    When the event processor receives an alter table event and the event
    type is of truncate operation currently file metadata is not reloaded.
    This patch addresses this issue, where the alter table event type is
    verified if it's a truncate operation and then reload the file metadata
    accordingly.
    
    Note: Alter table event for an external table generated by the truncate
    operation from Impala cannot be identified if it's a truncate op or not
    This becomes an issue in multi cluster Impala environments where events
    generated from one impala cluster is consumed by other impala clusters.
    Truncate operations in Impala on replicated tables will generated alter
    event with 'isTruncateOp' field set to true.
    
    Testing:
    Added an end-to-end test to verify whether file metadata is reloaded
    for the above scenario.
    
    Change-Id: I53bb80c294623eec7c79d9f30f410771386c6b75
    Reviewed-on: http://gerrit.cloudera.org:8080/20887
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/events/MetastoreEvents.java     | 53 ++++++++++++++++----
 tests/custom_cluster/test_events_custom_configs.py | 58 ++++++++++++++++++++++
 2 files changed, 101 insertions(+), 10 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 6dcd86420..887404d36 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1451,6 +1451,8 @@ public class MetastoreEvents {
     private final Boolean eventSyncAfterFlag_;
     // value of the db flag at the time of event creation
     private final boolean dbFlagVal;
+    // true if this alter event was due to a truncate operation in metastore
+    private final boolean isTruncateOp_;
 
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory instead
@@ -1467,6 +1469,7 @@ public class MetastoreEvents {
         msTbl_ = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
         tableAfter_ = Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
         tableBefore_ = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
+        isTruncateOp_ = alterTableMessage.getIsTruncateOp();
       } catch (Exception e) {
         throw new MetastoreNotificationException(
             debugString("Unable to parse the alter table message"), e);
@@ -1563,7 +1566,8 @@ public class MetastoreEvents {
             + "which can be ignored.");
         return;
       }
-      skipFileMetadataReload_ = canSkipFileMetadataReload(tableBefore_, tableAfter_);
+      skipFileMetadataReload_ = !isTruncateOp_ && canSkipFileMetadataReload(tableBefore_,
+          tableAfter_);
       long startNs = System.nanoTime();
       if (wasEventSyncTurnedOn()) {
         handleEventSyncTurnedOn();
@@ -1716,6 +1720,11 @@ public class MetastoreEvents {
       // event by setting those parameters equal before and after the event and
       // comparing the objects.
 
+      // alter table event from truncate ops always can't be skipped.
+      if (isTruncateOp_) {
+        return false;
+      }
+
       // Avoid modifying the object from event.
       org.apache.hadoop.hive.metastore.api.Table tblAfter = tableAfter_.deepCopy();
       setTrivialParameters(tableBefore_.getParameters(), tblAfter.getParameters());
@@ -2190,6 +2199,8 @@ public class MetastoreEvents {
     private final long versionNumberFromEvent_;
     // the service id from the partition parameters of the event.
     private final String serviceIdFromEvent_;
+    // true if this alter event was due to a truncate operation in metastore
+    private final boolean isTruncateOp_;
 
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory instead
@@ -2208,6 +2219,7 @@ public class MetastoreEvents {
             Preconditions.checkNotNull(alterPartitionMessage.getPtnObjBefore());
         partitionAfter_ =
             Preconditions.checkNotNull(alterPartitionMessage.getPtnObjAfter());
+        isTruncateOp_ = alterPartitionMessage.getIsTruncateOp();
         msTbl_ = alterPartitionMessage.getTableObj();
         Map<String, String> parameters = partitionAfter_.getParameters();
         versionNumberFromEvent_ = Long.parseLong(
@@ -2302,9 +2314,13 @@ public class MetastoreEvents {
             partitionAfter_);
         try {
           // load file metadata only if storage descriptor of partitionAfter_ differs
-          // from sd of HdfsPartition
-          reloadPartitions(Arrays.asList(partitionAfter_),
-              FileMetadataLoadOpts.LOAD_IF_SD_CHANGED, "ALTER_PARTITION event");
+          // from sd of HdfsPartition. If the alter_partition event type is of truncate
+          // then force load the file metadata.
+          FileMetadataLoadOpts fileMetadataLoadOpts =
+              isTruncateOp_ ? FileMetadataLoadOpts.FORCE_LOAD :
+                  FileMetadataLoadOpts.LOAD_IF_SD_CHANGED;
+          reloadPartitions(Arrays.asList(partitionAfter_), fileMetadataLoadOpts,
+              "ALTER_PARTITION event");
         } catch (CatalogException e) {
           throw new MetastoreNotificationNeedsInvalidateException(
               debugString("Refresh partition on table {} partition {} failed. Event " +
@@ -2323,6 +2339,11 @@ public class MetastoreEvents {
       // event by setting those parameters equal before and after the event and
       // comparing the objects.
 
+      // alter partition event from truncate ops always can't be skipped.
+      if (isTruncateOp_) {
+        return false;
+      }
+
       // Avoid modifying the object from event.
       Partition afterPartition = partitionAfter_.deepCopy();
       setTrivialParameters(partitionBefore_.getParameters(),
@@ -2426,17 +2447,22 @@ public class MetastoreEvents {
       // Ignore the event if this is a trivial event. See javadoc for
       // isTrivialAlterPartitionEvent() for examples.
       List<T> eventsToProcess = new ArrayList<>();
+      List<Partition> partitionEventsToForceReload = new ArrayList<>();
       for (T event : batchedEvents_) {
         if (isOlderEvent(event.getPartitionForBatching())) {
           infoLog("Not processing the current event id {} as it is an older event",
               event.getEventId());
           continue;
         }
-        if (!event.canBeSkipped()) {
+        boolean isTruncateOp = (event instanceof AlterPartitionEvent &&
+            ((AlterPartitionEvent)event).isTruncateOp_);
+        if (isTruncateOp) {
+          partitionEventsToForceReload.add(event.getPartitionForBatching());
+        } else if (!event.canBeSkipped()){
           eventsToProcess.add(event);
         }
       }
-      if (eventsToProcess.isEmpty()) {
+      if (eventsToProcess.isEmpty() && partitionEventsToForceReload.isEmpty()) {
         LOG.info(
             "Ignoring events from event id {} to {} since they modify parameters "
             + " which can be ignored", getFirstEventId(), getLastEventId());
@@ -2459,10 +2485,17 @@ public class MetastoreEvents {
             reloadPartitions(partitions, FileMetadataLoadOpts.FORCE_LOAD,
                 getEventType().toString() + " event");
           } else {
-            // alter partition event. Reload file metadata of only those partitions
-            // for which sd has changed
-            reloadPartitions(partitions, FileMetadataLoadOpts.LOAD_IF_SD_CHANGED,
-                getEventType().toString() + " event");
+            if (!partitionEventsToForceReload.isEmpty()) {
+              // force reload truncated partitions
+              reloadPartitions(partitionEventsToForceReload,
+                  FileMetadataLoadOpts.FORCE_LOAD, getEventType().toString() + " event");
+            }
+            if (!partitions.isEmpty()) {
+              // alter partition event. Reload file metadata of only those partitions
+              // for which sd has changed
+              reloadPartitions(partitions, FileMetadataLoadOpts.LOAD_IF_SD_CHANGED,
+                  getEventType().toString() + " event");
+            }
           }
         } catch (CatalogException e) {
           throw new MetastoreNotificationNeedsInvalidateException(String.format(
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index aaebde2b9..09ab84b58 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -1086,3 +1086,61 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
       assert parts_added_before == parts_added_after
       assert parts_refreshed_before == parts_refreshed_after
       assert events_skipped_after > events_skipped_before
+
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5")
+  def test_truncate_table_from_hive(self, unique_database):
+    """IMPALA-12636: verify truncate table from hive reloads file metadata in Impala"""
+    hive_tbl = "tbl_in_hive"
+    values = "values (10),(20),(30)"
+
+    def verify_truncate_op_in_hive(tbl_name, is_transactional, is_partitioned,
+        is_batched):
+      create_query = " ".join(["create", "table `{}`.`{}` (i int)",
+        " partitioned by (year int) " if is_partitioned else '',
+          self.__get_transactional_tblproperties(is_transactional)])
+      self.execute_query(create_query.format(unique_database, tbl_name))
+      insert_query = " ".join(["insert into `{}`.`{}`", "partition (year=2024)"
+        if is_partitioned else '', values])
+      self.run_stmt_in_hive(insert_query.format(unique_database, tbl_name))
+      EventProcessorUtils.wait_for_event_processing(self)
+      self.client.execute("refresh {}.{}".format(unique_database, tbl_name))
+      truncate_query = " ".join(["truncate table `{}`.`{}`", "partition (year=2024)"
+        if is_partitioned else ''])
+      self.run_stmt_in_hive(truncate_query.format(unique_database, tbl_name))
+      if is_batched:
+        self.run_stmt_in_hive(
+          "insert into {}.{} partition (year=2024) values (1),(2)"
+          .format(unique_database, tbl_name))
+      EventProcessorUtils.wait_for_event_processing(self)
+      data = int(self.execute_scalar("select count(*) from {0}.{1}".format(
+        unique_database, tbl_name)))
+      assert data == 2 if is_batched else data == 0
+      self.client.execute("drop table {}.{}".format(unique_database, tbl_name))
+    # Case-I: truncate single partition
+    verify_truncate_op_in_hive(hive_tbl, False, False, False)
+    verify_truncate_op_in_hive(hive_tbl, True, False, False)
+    verify_truncate_op_in_hive(hive_tbl, False, True, False)
+    verify_truncate_op_in_hive(hive_tbl, False, True, True)
+    verify_truncate_op_in_hive(hive_tbl, True, True, False)
+    verify_truncate_op_in_hive(hive_tbl, True, True, True)
+
+    # Case-II: truncate partition in multi partition
+    hive_tbl = "multi_part_tbl"
+    self.client.execute("create table {}.{} (i int) partitioned by "
+      "(p int, q int)".format(unique_database, hive_tbl))
+    self.client.execute("insert into {}.{} partition(p, q) values "
+      "(0,0,0), (0,0,1), (0,0,2)".format(unique_database, hive_tbl))
+    self.client.execute("insert into {}.{} partition(p, q) values "
+      "(0,1,0), (0,1,1)".format(unique_database, hive_tbl))
+    self.run_stmt_in_hive("truncate table {}.{} partition(p=0)"
+      .format(unique_database, hive_tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    data = int(self.execute_scalar("select count(*) from {0}.{1}".format(
+      unique_database, hive_tbl)))
+    assert data == 2
+    self.run_stmt_in_hive("truncate table {}.{}"
+      .format(unique_database, hive_tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    data = int(self.execute_scalar("select count(*) from {0}.{1}".format(
+      unique_database, hive_tbl)))
+    assert data == 0