You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2023/03/01 14:56:44 UTC

[impala] 04/04: IMPALA-11626: Handle COMMIT_COMPACTION_EVENT from HMS

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 23c265d12804c91c08a08a0be92c155424ea3d99
Author: Sai Hemanth Gantasala <sa...@cloudera.com>
AuthorDate: Wed Oct 19 09:37:17 2022 -0700

    IMPALA-11626: Handle COMMIT_COMPACTION_EVENT from HMS
    
    Since HIVE-24329 HMS emits an event when a compaction is committed,
    but Impala ignores it. Handling it would allow automatic refreshing
    of file metadata after commit compactions.
    
    Testing: Added an end-to-end test that tests the processing of
    commit compaction event that was triggered in HMS. Also added an
    edge case where event processor would handle the condition of the
    partition being missed/deleted.
    
    Change-Id: I464faedb4e3bbcd417bab2e3cb0d57e339d42605
    Reviewed-on: http://gerrit.cloudera.org:8080/19155
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/compat/MetastoreShim.java    |  8 ++
 .../org/apache/impala/compat/MetastoreShim.java    | 15 ++++
 .../impala/catalog/events/MetastoreEvents.java     | 79 +++++++++++++++++++
 .../apache/impala/service/CatalogOpExecutor.java   | 38 ++++++++--
 tests/custom_cluster/test_events_custom_configs.py | 88 ++++++++++++++++++++++
 5 files changed, 221 insertions(+), 7 deletions(-)

diff --git a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index e5235d54f..9fab5e384 100644
--- a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -443,6 +443,14 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
     throw new UnsupportedOperationException("Reload event is not supported.");
   }
 
+  /**
+   *   CDP Hive-3 only function.
+   */
+  public static String getPartitionNameFromCommitCompactionEvent(
+      NotificationEvent event) {
+    throw new UnsupportedOperationException("CommitCompaction event is not supported.");
+  }
+
   /**
    * Use thrift API directly instead of HiveMetastoreClient#getNextNotification because
    * the HMS client can throw an IllegalStateException when there is a gap between the
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index bbda843d0..b31897287 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.CommitCompactionMessage;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
@@ -565,6 +566,20 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
     return updatedFields;
   }
 
+  /**
+   *  This method extracts the partition name field from the
+   *  notification event and returns it in the form of string.
+   *
+   * @param event Metastore notification event,
+   * @return the partition name, required for the commit compaction event.
+   */
+  public static String getPartitionNameFromCommitCompactionEvent(
+      NotificationEvent event) {
+    CommitCompactionMessage commitCompactionMessage = MetastoreEventsProcessor.
+        getMessageDeserializer().getCommitCompactionMessage(event.getMessage());
+    return commitCompactionMessage.getPartName();
+  }
+
   /**
    * Wrapper around IMetaStoreClient.getThriftClient().get_next_notification() to deal
    * with added arguments.
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index a8eafd569..78895d2b4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -125,6 +125,7 @@ public class MetastoreEvents {
     ALLOC_WRITE_ID_EVENT("ALLOC_WRITE_ID_EVENT"),
     COMMIT_TXN("COMMIT_TXN"),
     ABORT_TXN("ABORT_TXN"),
+    COMMIT_COMPACTION("COMMIT_COMPACTION_EVENT"),
     OTHER("OTHER");
 
     private final String eventType_;
@@ -218,6 +219,8 @@ public class MetastoreEvents {
           return new ReloadEvent(catalogOpExecutor_, metrics, event);
         case INSERT:
           return new InsertEvent(catalogOpExecutor_, metrics, event);
+        case COMMIT_COMPACTION:
+          return new CommitCompactionEvent(catalogOpExecutor_, metrics, event);
         default:
           // ignore all the unknown events by creating a IgnoredEvent
           return new IgnoredEvent(catalogOpExecutor_, metrics, event);
@@ -920,6 +923,24 @@ public class MetastoreEvents {
       }
     }
 
+    protected void reloadPartitionsFromNames(List<String> partitionNames, String reason,
+        FileMetadataLoadOpts fileMetadataLoadOpts) throws CatalogException {
+      try {
+        int numPartsRefreshed = catalogOpExecutor_.reloadPartitionsFromNamesIfExists(
+            getEventId(), dbName_, tblName_, partitionNames, reason,
+            fileMetadataLoadOpts);
+        if (numPartsRefreshed > 0) {
+          metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES)
+                  .inc(numPartsRefreshed);
+        }
+      } catch (TableNotLoadedException e) {
+        debugLog("Ignoring the event since table {} is not loaded",
+            getFullyQualifiedTblName());
+      } catch (DatabaseNotFoundException | TableNotFoundException e) {
+        debugLog("Ignoring the event since table {} is not found",
+            getFullyQualifiedTblName());
+      }
+    }
 
     /**
      * To decide whether to skip processing this event, fetch table from cache
@@ -2622,6 +2643,64 @@ public class MetastoreEvents {
     }
   }
 
+  /**
+   * Metastore event handler for COMMIT_COMPACTION events. Handles
+   * COMMIT_COMPACTION event for transactional tables.
+   */
+  public static class CommitCompactionEvent extends MetastoreTableEvent {
+    private String partitionName_;
+
+    CommitCompactionEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
+        NotificationEvent event) throws MetastoreNotificationException {
+      super(catalogOpExecutor, metrics, event);
+      Preconditions.checkState(
+          getEventType().equals(MetastoreEventType.COMMIT_COMPACTION));
+      Preconditions.checkNotNull(event.getMessage());
+      try {
+        partitionName_ =
+            MetastoreShim.getPartitionNameFromCommitCompactionEvent(event);
+        org.apache.impala.catalog.Table tbl = catalog_.getTable(dbName_, tblName_);
+        if (tbl != null && tbl.getCreateEventId() < getEventId()) {
+          msTbl_ = tbl.getMetaStoreTable();
+        }
+      } catch (Exception ex) {
+        throw new MetastoreNotificationException(debugString("Unable to "
+            + "parse commit compaction message"), ex);
+      }
+    }
+
+    @Override
+    protected void process() throws MetastoreNotificationException {
+      try {
+        if (partitionName_ == null) {
+          reloadTableFromCatalog("Commit Compaction event", true);
+        } else {
+          reloadPartitionsFromNames(Arrays.asList(partitionName_),
+                  "Commit compaction event", FileMetadataLoadOpts.FORCE_LOAD);
+        }
+      } catch (CatalogException e) {
+        throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
+            + "commit compaction for the table {}. Event processing cannot "
+            + "continue. Issue an invalidate metadata command to reset " +
+            "event processor.", tblName_), e);
+      }
+    }
+
+    @Override
+    protected SelfEventContext getSelfEventContext() {
+      throw new UnsupportedOperationException("Self-event evaluation is not needed for "
+          + "this event type");
+    }
+
+    @Override
+    protected boolean isEventProcessingDisabled() {
+      if (msTbl_ == null) {
+        return false;
+      }
+      return super.isEventProcessingDisabled();
+    }
+  }
+
   /**
    * An event type which is ignored. Useful for unsupported metastore event types
    */
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index a99f8a00e..dcf11da26 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -4497,6 +4497,35 @@ public class CatalogOpExecutor {
   public int reloadPartitionsIfExist(long eventId, String dbName, String tblName,
       List<Partition> partsFromEvent, String reason,
       FileMetadataLoadOpts fileMetadataLoadOpts) throws CatalogException {
+    List<String> partNames = new ArrayList<>();
+    Table table = catalog_.getTable(dbName, tblName);
+    if (table instanceof HdfsTable) {
+      HdfsTable hdfsTable = (HdfsTable) table;
+      for (Partition part : partsFromEvent) {
+        partNames.add(FileUtils.makePartName(hdfsTable.getClusteringColNames(),
+            part.getValues(), null));
+      }
+    }
+    return reloadPartitionsFromNamesIfExists(eventId, dbName, tblName, partNames,
+        reason, fileMetadataLoadOpts);
+  }
+
+  /**
+   * Reloads the given partitions from partiton names if they exist and have not been
+   * removed since the event was generated.
+   *
+   * @param eventId EventId being processed.
+   * @param dbName Database name for the partition
+   * @param tblName Table name for the partition
+   * @param partNames List of partition names from the events to be reloaded.
+   * @param reason Reason for reloading the partitions for logging purposes.
+   * @param fileMetadataLoadOpts describes how to reload file metadata for partsFromEvent
+   * @return the number of partitions which were reloaded. If the table does not exist,
+   * returns 0. Some partitions could be skipped if they don't exist anymore.
+   */
+  public int reloadPartitionsFromNamesIfExists (long eventId, String dbName,
+      String tblName, List<String> partNames, String reason,
+      FileMetadataLoadOpts fileMetadataLoadOpts) throws CatalogException {
     Table table = catalog_.getTable(dbName, tblName);
     if (table == null) {
       DeleteEventLog deleteEventLog = catalog_.getMetastoreEventProcessor()
@@ -4536,13 +4565,8 @@ public class CatalogOpExecutor {
       }
       HdfsTable hdfsTable = (HdfsTable) table;
       // some partitions from the event or the table itself
-      // may not exist in HMS anymore. Hence, we collect the names here and re-fetch
+      // may not exist in HMS anymore. Hence, we re-fetch
       // the partitions from HMS.
-      List<String> partNames = new ArrayList<>();
-      for (Partition part : partsFromEvent) {
-        partNames.add(FileUtils.makePartName(hdfsTable.getClusteringColNames(),
-            part.getValues(), null));
-      }
       int numOfPartsReloaded;
       try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
         numOfPartsReloaded = hdfsTable.reloadPartitionsFromNames(
@@ -4551,7 +4575,7 @@ public class CatalogOpExecutor {
       hdfsTable.setCatalogVersion(newCatalogVersion);
       return numOfPartsReloaded;
     } catch (TableLoadingException e) {
-      LOG.info("Could not reload {} partitions of table {}", partsFromEvent.size(),
+      LOG.info("Could not reload {} partitions of table {}", partNames.size(),
           table.getFullName(), e);
     } catch (InternalException e) {
       errorOccured = true;
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index 36bc7ebdf..63385a325 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -266,6 +266,94 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
         .format(unique_database, test_reload_table))
     check_self_events("refresh {}.{}".format(unique_database, test_reload_table))
 
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
+  def test_commit_compaction_events(self, unique_database):
+    """Test is to verify Impala-11626, commit compaction events triggered in HMS would
+    be consumed by CatalogD's event processor.
+    """
+
+    # Test scenario 1: partitioned table
+    test_cc_part_table = "test_cc_partitioned_table"
+    self.run_stmt_in_hive(
+      "create transactional table {}.{} (i int) partitioned by (year int)"
+      .format(unique_database, test_cc_part_table))
+    for i in range(2):
+        self.run_stmt_in_hive(
+          "insert into {}.{} partition (year=2022) values (1),(2),(3)"
+          .format(unique_database, test_cc_part_table))
+    EventProcessorUtils.wait_for_event_processing(self)
+    parts_refreshed_before_compaction = EventProcessorUtils.get_int_metric(
+      "partitions-refreshed")
+    self.client.execute(
+      "select * from {}.{} limit 2"
+      .format(unique_database, test_cc_part_table))
+    self.run_stmt_in_hive(
+      "alter table {}.{} partition(year=2022) compact 'minor' and wait"
+      .format(unique_database, test_cc_part_table))
+    EventProcessorUtils.wait_for_event_processing(self)
+    parts_refreshed_after_compaction = EventProcessorUtils.get_int_metric(
+      "partitions-refreshed")
+    assert parts_refreshed_after_compaction > parts_refreshed_before_compaction
+
+    # Test scenario 2:
+    test_cc_unpart_tab = "test_cc_unpart_table"
+    self.run_stmt_in_hive(
+      "create transactional table {}.{} (i int)"
+      .format(unique_database, test_cc_unpart_tab))
+    for i in range(2):
+        self.run_stmt_in_hive(
+          "insert into {}.{} values (1),(2),(3)"
+          .format(unique_database, test_cc_unpart_tab))
+    EventProcessorUtils.wait_for_event_processing(self)
+    tables_refreshed_before_compaction = EventProcessorUtils.get_int_metric(
+      "tables-refreshed")
+    self.client.execute(
+      "select * from {}.{} limit 2"
+      .format(unique_database, test_cc_unpart_tab))
+    self.run_stmt_in_hive("alter table {}.{} compact 'minor' and wait"
+      .format(unique_database, test_cc_unpart_tab))
+    EventProcessorUtils.wait_for_event_processing(self)
+    tables_refreshed_after_compaction = EventProcessorUtils.get_int_metric(
+      "tables-refreshed")
+    assert tables_refreshed_after_compaction > tables_refreshed_before_compaction
+
+    # Test scenario 3: partitioned table has partition deleted
+    test_cc_part_table = "test_cc_partitioned_table_error"
+    self.run_stmt_in_hive(
+      "create transactional table {}.{} (i int) partitioned by (year int)"
+      .format(unique_database, test_cc_part_table))
+    for i in range(2):
+        self.run_stmt_in_hive(
+          "insert into {}.{} partition (year=2022) values (1),(2),(3)"
+          .format(unique_database, test_cc_part_table))
+    EventProcessorUtils.wait_for_event_processing(self)
+    self.client.execute(
+      "select * from {}.{} limit 2"
+      .format(unique_database, test_cc_part_table))
+    self.run_stmt_in_hive(
+      "alter table {}.{} partition(year=2022) compact 'minor' and wait"
+      .format(unique_database, test_cc_part_table))
+    self.run_stmt_in_hive("alter table {}.{} Drop if exists partition(year=2022)"
+      .format(unique_database, test_cc_part_table))
+    EventProcessorUtils.wait_for_event_processing(self)
+    assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
+
+    # Test scenario 4: process commit compaction for an unloaded table
+    test_cc_part_table = "test_cc_table_unloaded"
+    self.run_stmt_in_hive(
+      "create transactional table {}.{} (i int) partitioned by (year int)"
+      .format(unique_database, test_cc_part_table))
+    for i in range(2):
+        self.run_stmt_in_hive(
+          "insert into {}.{} partition (year=2022) values (1),(2),(3)"
+          .format(unique_database, test_cc_part_table))
+    EventProcessorUtils.wait_for_event_processing(self)
+    self.run_stmt_in_hive(
+      "alter table {}.{} partition(year=2022) compact 'minor' and wait"
+      .format(unique_database, test_cc_part_table))
+    EventProcessorUtils.wait_for_event_processing(self)
+    assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
+
   @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
   def test_event_batching(self, unique_database):
     """Runs queries which generate multiple ALTER_PARTITION events which must be