You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2024/01/11 18:07:59 UTC

(impala) 03/05: IMPALA-12356: Fix first ALTER_PARTITION event from Hive could be treated as self event

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 32b29ff36fb3e05fd620a6714de88805052d0117
Author: Venu Reddy <k....@gmail.com>
AuthorDate: Fri Sep 15 12:44:27 2023 +0530

    IMPALA-12356: Fix first ALTER_PARTITION event from Hive could be
    treated as self event
    
    Self event check for add partition event is done only for the
    transactional tables with IMPALA-10502 (commit id: 7f7a631). But
    during addition of new partition(with insert statement), catalog
    service id and version number are added to partition params of the
    parition irrespective of whether the table is transactional or not.
    Thus the version number is added to partition's inFlightEvents_ and
    remained in it until the next alter partition event from hive. Thus
    led to detection of the alter partition event as self event.
    
    This commit ensures the catalog service id and version number are not
    added to partition params if the partition is added to a
    non-transactional table.
    
    Also fixed another bug in reload event. Reload event self check
    fails due to the above fix as it expects catalog service id and
    version number in the partition params. Fixed to use last refreshed
    event id to skip the self reload events.
    
    Testing:
    - Manually tested in cluster and added testcases
    
    Change-Id: I23c2affa3fe32c0b3843bff5e4c0018dce9060d3
    Reviewed-on: http://gerrit.cloudera.org:8080/20486
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/CatalogServiceCatalog.java      |  4 +-
 .../org/apache/impala/catalog/HdfsPartition.java   |  6 +--
 .../impala/catalog/events/MetastoreEvents.java     | 22 +++-----
 .../apache/impala/service/CatalogOpExecutor.java   | 27 +++++-----
 tests/custom_cluster/test_events_custom_configs.py | 58 +++++++++++++---------
 tests/metadata/test_event_processing.py            | 44 ++++++++++++++++
 6 files changed, 101 insertions(+), 60 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 5ffe33e93..83db6f7b9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -4025,9 +4025,7 @@ public class CatalogServiceCatalog extends Catalog {
       LOG.info("Partition {} of table {}.{} has last refresh id as {}. " +
           "Comparing it with {}.", hdfsPartition.getPartitionName(), dbName, tableName,
           hdfsPartition.getLastRefreshEventId(), eventId);
-      if (hdfsPartition.getLastRefreshEventId() > eventId) {
-        return true;
-      }
+      if (hdfsPartition.getLastRefreshEventId() >= eventId) return true;
     } catch (CatalogException ex) {
       LOG.warn("Encountered an exception while the partition's last refresh event id: "
           + dbName + "." + tableName + ". Ignoring further processing and try to " +
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 861a01adc..e7810344d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -1786,9 +1786,9 @@ public class HdfsPartition extends CatalogObjectImpl
           && hmsParameters_.equals(oldInstance.hmsParameters_)
           && partitionStats_ == oldInstance.partitionStats_
           && hasIncrementalStats_ == oldInstance.hasIncrementalStats_
-          && numRows_ == oldInstance.numRows_
-          && writeId_ == oldInstance.writeId_
-          && lastCompactionId_ == oldInstance.lastCompactionId_);
+          && numRows_ == oldInstance.numRows_ && writeId_ == oldInstance.writeId_
+          && lastCompactionId_ == oldInstance.lastCompactionId_
+          && lastRefreshEventId_ == oldInstance_.lastRefreshEventId_);
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 062b9fd84..6dcd86420 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -2711,22 +2711,13 @@ public class MetastoreEvents {
 
     @Override
     public SelfEventContext getSelfEventContext() {
-      if (reloadPartition_ != null) {
-        // create selfEventContext for reload partition event
-        List<TPartitionKeyValue> tPartSpec =
-            getTPartitionSpecFromHmsPartition(msTbl_, reloadPartition_);
-        return new SelfEventContext(dbName_, tblName_, Arrays.asList(tPartSpec),
-            reloadPartition_.getParameters(), null);
-      } else {
-        // create selfEventContext for reload table event
-        return new SelfEventContext(
-            dbName_, tblName_, null, msTbl_.getParameters());
-      }
+      throw new UnsupportedOperationException("Self-event evaluation is unnecessary for"
+          + " this event type");
     }
 
     @Override
     public void processTableEvent() throws MetastoreNotificationException {
-      if (isSelfEvent() || isOlderEvent()) {
+      if (isOlderEvent()) {
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
             .inc(getNumberOfEvents());
         infoLog("Incremented events skipped counter to {}",
@@ -2750,9 +2741,10 @@ public class MetastoreEvents {
         return false;
       }
       // Always check the lastRefreshEventId on the table first for table level refresh
-      if (tbl_.getLastRefreshEventId() > getEventId() || (reloadPartition_ != null &&
-          catalog_.isPartitionLoadedAfterEvent(dbName_, tblName_, reloadPartition_,
-              getEventId()))) {
+      if (tbl_.getLastRefreshEventId() >= getEventId()
+          || (reloadPartition_ != null
+                 && catalog_.isPartitionLoadedAfterEvent(
+                        dbName_, tblName_, reloadPartition_, getEventId()))) {
         return true;
       }
       return false;
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 29178180c..edd68215c 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -4556,7 +4556,8 @@ public class CatalogOpExecutor {
         if (removed) {
           LOG.info(
               "EventId: {} Skipping addition of partition {} since it was removed later"
-                  + "in catalog for table {}", eventId,
+                  + " in catalog for table {}",
+              eventId,
               FileUtils.makePartName(hdfsTable.getClusteringColNames(), part.getValues()),
               hdfsTable.getFullName());
         } else {
@@ -6737,7 +6738,7 @@ public class CatalogOpExecutor {
         }
         Preconditions.checkNotNull(tbl, "tbl is null in " + cmdString);
         // fire event for refresh event and update the last refresh event id
-        fireReloadEventAndUpdateRefreshEventId(req, updatedThriftTable, tblName, tbl);
+        fireReloadEventAndUpdateRefreshEventId(req, tblName, tbl);
       }
 
       // Return the TCatalogObject in the result to indicate this request can be
@@ -6783,31 +6784,22 @@ public class CatalogOpExecutor {
    * This class invokes metastore shim's fireReloadEvent to fire event to HMS
    * and update the last refresh event id in the cache
    * @param req - request object for TResetMetadataRequest.
-   * @param updatedThriftTable - updated thrift table after refresh query
    * @param tblName
    * @param tbl
    */
-  private void fireReloadEventAndUpdateRefreshEventId(TResetMetadataRequest req,
-      TCatalogObject updatedThriftTable, TableName tblName, Table tbl) {
+  private void fireReloadEventAndUpdateRefreshEventId(
+      TResetMetadataRequest req, TableName tblName, Table tbl) {
     List<String> partVals = null;
     if (req.isSetPartition_spec()) {
       partVals = req.getPartition_spec().stream().
           map(partSpec -> partSpec.getValue()).collect(Collectors.toList());
     }
     try {
-      // Get new catalog version for table refresh/invalidate.
-      long newCatalogVersion = updatedThriftTable.getCatalog_version();
-      Map<String, String> tableParams = new HashMap<>();
-      tableParams.put(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
-          catalog_.getCatalogServiceId());
-      tableParams.put(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
-          String.valueOf(newCatalogVersion));
       List<Long> eventIds = MetastoreShim.fireReloadEventHelper(
           catalog_.getMetaStoreClient(), req.isIs_refresh(), partVals, tblName.getDb(),
-          tblName.getTbl(), tableParams);
+          tblName.getTbl(), Collections.emptyMap());
       if (req.isIs_refresh()) {
         if (catalog_.tryLock(tbl, true, 600000)) {
-          catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
           if (!eventIds.isEmpty()) {
             if (req.isSetPartition_spec()) {
               HdfsPartition partition = ((HdfsTable) tbl)
@@ -6982,7 +6974,12 @@ public class CatalogOpExecutor {
               partition.setValues(MetaStoreUtil.getPartValsFromName(msTbl, partName));
               partition.setSd(MetaStoreUtil.shallowCopyStorageDescriptor(msTbl.getSd()));
               partition.getSd().setLocation(msTbl.getSd().getLocation() + "/" + partName);
-              addCatalogServiceIdentifiers(msTbl, partition);
+              if (AcidUtils.isTransactionalTable(msTbl.getParameters())) {
+                // Self event detection is deprecated for non-transactional tables add
+                // partition. So we add catalog service identifiers only for
+                // transactional tables
+                addCatalogServiceIdentifiers(msTbl, partition);
+              }
               MetastoreShim.updatePartitionStatsFast(partition, msTbl, warehouse);
             }
 
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index 5f5942a3e..aaebde2b9 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -233,15 +233,24 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     self.__run_self_events_test(unique_database, True)
     self.__run_self_events_test(unique_database, False)
 
+  @CustomClusterTestSuite.with_args(
+      catalogd_args="--hms_event_polling_interval_s=5"
+                    " --enable_reload_events=true")
+  def test_refresh_invalidate_events(self, unique_database):
+    self.run_test_refresh_invalidate_events(unique_database, "reload_table")
+
   @CustomClusterTestSuite.with_args(
       catalogd_args="--hms_event_polling_interval_s=5"
                     " --enable_reload_events=true"
                     " --enable_sync_to_latest_event_on_ddls=true")
-  def test_refresh_invalidate_events(self, unique_database):
+  def test_refresh_invalidate_events_enable_sync_to_latest_events(self, unique_database):
+    self.run_test_refresh_invalidate_events(unique_database, "reload_table_sync", True)
+
+  def run_test_refresh_invalidate_events(self, unique_database, test_reload_table,
+    enable_sync_to_latest_event_on_ddls=False):
     """Test is to verify Impala-11808, refresh/invalidate commands should generate a
     Reload event in HMS and CatalogD's event processor should process this event.
     """
-    test_reload_table = "test_reload_table"
     self.client.execute(
       "create table {}.{} (i int) partitioned by (year int) "
         .format(unique_database, test_reload_table))
@@ -275,28 +284,29 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     check_self_events("refresh {}.{}".format(unique_database, test_reload_table))
     EventProcessorUtils.wait_for_event_processing(self)
 
-    # Test to verify if older events are being skipped in event processor
-    data = FireEventRequestData()
-    data.refreshEvent = True
-    req = FireEventRequest(True, data)
-    req.dbName = unique_database
-    req.tableName = test_reload_table
-    # table level reload events
-    tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
-    for i in range(10):
-      self.hive_client.fire_listener_event(req)
-    EventProcessorUtils.wait_for_event_processing(self)
-    tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
-    assert tbl_events_skipped_after > tbl_events_skipped_before
-    # partition level reload events
-    EventProcessorUtils.wait_for_event_processing(self)
-    part_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
-    req.partitionVals = ["2022"]
-    for i in range(10):
-      self.hive_client.fire_listener_event(req)
-    EventProcessorUtils.wait_for_event_processing(self)
-    part_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
-    assert part_events_skipped_after > part_events_skipped_before
+    if enable_sync_to_latest_event_on_ddls:
+      # Test to verify if older events are being skipped in event processor
+      data = FireEventRequestData()
+      data.refreshEvent = True
+      req = FireEventRequest(True, data)
+      req.dbName = unique_database
+      req.tableName = test_reload_table
+      # table level reload events
+      tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
+      for i in range(10):
+        self.hive_client.fire_listener_event(req)
+      EventProcessorUtils.wait_for_event_processing(self)
+      tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
+      assert tbl_events_skipped_after > tbl_events_skipped_before
+      # partition level reload events
+      EventProcessorUtils.wait_for_event_processing(self)
+      part_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
+      req.partitionVals = ["2022"]
+      for i in range(10):
+        self.hive_client.fire_listener_event(req)
+      EventProcessorUtils.wait_for_event_processing(self)
+      part_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
+      assert part_events_skipped_after > part_events_skipped_before
 
     # Test to verify IMPALA-12213
     table = self.hive_client.get_table(unique_database, test_reload_table)
diff --git a/tests/metadata/test_event_processing.py b/tests/metadata/test_event_processing.py
index f0358b7aa..ff764db78 100644
--- a/tests/metadata/test_event_processing.py
+++ b/tests/metadata/test_event_processing.py
@@ -432,6 +432,50 @@ class TestEventProcessing(ImpalaTestSuite):
     finally:
       check_call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", staging_dir])
 
+  def test_transact_partition_location_change_from_hive(self, unique_database):
+    """IMPALA-12356: Verify alter partition from hive on transactional table"""
+    self.run_test_partition_location_change_from_hive(unique_database,
+                                                      "transact_alter_part_hive", True)
+
+  def test_partition_location_change_from_hive(self, unique_database):
+    """IMPALA-12356: Verify alter partition from hive on non-transactional table"""
+    self.run_test_partition_location_change_from_hive(unique_database, "alter_part_hive")
+
+  def run_test_partition_location_change_from_hive(self, unique_database, tbl_name,
+    is_transactional=False):
+    fq_tbl_name = unique_database + "." + tbl_name
+    TBLPROPERTIES = self.__get_transactional_tblproperties(is_transactional)
+    # Create the table
+    self.client.execute(
+      "create table %s (i int) partitioned by(j int) stored as parquet %s"
+      % (fq_tbl_name, TBLPROPERTIES))
+    # Insert some data to a partition
+    p1 = "j=1"
+    self.client.execute("insert into table %s partition(%s) values (0),(1),(2)"
+                        % (fq_tbl_name, p1))
+    tbl_location = self._get_table_property("Location:", fq_tbl_name)
+    partitions = self.get_impala_partition_info(fq_tbl_name, 'Location')
+    assert [("{0}/{1}".format(tbl_location, p1),)] == partitions
+    # Alter partition location from hive
+    new_part_location = tbl_location + "/j=2"
+    self.run_stmt_in_hive("alter table %s partition(%s) set location '%s'"
+                          % (fq_tbl_name, p1, new_part_location))
+    EventProcessorUtils.wait_for_event_processing(self)
+    # Verify if the location is updated
+    partitions = self.get_impala_partition_info(fq_tbl_name, 'Location')
+    assert [(new_part_location,)] == partitions
+
+  def _get_table_property(self, property_name, table_name):
+    """Extract the table property value from output of DESCRIBE FORMATTED."""
+    result = self.client.execute("describe formatted {0}".format(table_name))
+    for row in result.data:
+      if property_name in row:
+        row = row.split('\t')
+        if row[1] == 'NULL':
+          break
+        return row[1].rstrip()
+    return None
+
   def __get_transactional_tblproperties(self, is_transactional):
     """
     Util method to generate the tblproperties for transactional tables