You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2023/03/09 14:13:29 UTC

[impala] 07/07: IMPALA-11822: Optimize the Refresh/Invalidate event processing by skipping unnecessary events

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 148888e3ed4f97292499b2e6ee8d5a756dc648d9
Author: Sai Hemanth Gantasala <sa...@cloudera.com>
AuthorDate: Wed Feb 8 12:01:41 2023 -0800

    IMPALA-11822: Optimize the Refresh/Invalidate event processing by skipping unnecessary events
    
    Added a new variable 'lastRefreshEventId' in the
    catalogD's table/partition object to store the
    latest event id before loading the table/partition.
    This will be updated frequently based on refresh
    or invalidate commands. This variable can be used
    in the event processor to decide whether to process
    or skip the reload event by comparing it with the
    current event id. It is enough to store the refresh
    event's event id, invalidate event anyway flushes
    out the object from cache.
    
    Note: Need to enable two configs for this
    optimization to work:
    1) enable_reload_events=true
    2) enable_sync_to_latest_event_on_ddls=true
    
    Testing: Added a test to fire few reload events via
    HMS API and then verify in the event processor that
    some older events are skipped.
    
    Change-Id: I905957683a96c3ea01ab4bf043d6658ce37b7574
    Reviewed-on: http://gerrit.cloudera.org:8080/19484
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/CatalogServiceCatalog.java      | 21 +++++++++++++
 .../org/apache/impala/catalog/HdfsPartition.java   | 21 +++++++++++--
 .../java/org/apache/impala/catalog/HdfsTable.java  |  8 +++++
 .../main/java/org/apache/impala/catalog/Table.java | 18 ++++++++++++
 .../org/apache/impala/catalog/TableLoader.java     |  1 +
 .../impala/catalog/events/MetastoreEvents.java     | 19 +++++++++++-
 .../apache/impala/service/CatalogOpExecutor.java   | 26 ++++++++++++-----
 tests/custom_cluster/test_events_custom_configs.py | 34 ++++++++++++++++++++--
 8 files changed, 135 insertions(+), 13 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index c87cda9a2..53f423f4c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -2567,6 +2567,13 @@ public class CatalogServiceCatalog extends Catalog {
       }
       tbl.setCatalogVersion(newCatalogVersion);
       LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
+      // Set the last refresh event id as current HMS event id since all the metadata
+      // until the current HMS event id is refreshed at this point.
+      if (currentHmsEventId > eventId) {
+        tbl.setLastRefreshEventId(currentHmsEventId);
+      } else {
+        tbl.setLastRefreshEventId(eventId);
+      }
       return tbl.toTCatalogObject(resultType);
     } finally {
       context.stop();
@@ -3837,4 +3844,18 @@ public class CatalogServiceCatalog extends Catalog {
     return syncToLatestEventFactory_;
   }
 
+  public boolean isPartitionLoadedAfterEvent(String dbName, String tableName,
+      Partition msPartition, long eventId) {
+    try {
+      HdfsPartition hdfsPartition = getHdfsPartition(dbName, tableName, msPartition);
+      if (hdfsPartition.getLastRefreshEventId() > eventId) {
+        return true;
+      }
+    } catch (CatalogException ex) {
+      LOG.warn("Encountered an exception while the partition's last refresh event id: "
+          + dbName + "." + tableName + ". Ignoring further processing and try to " +
+          "reload the partition.", ex);
+    }
+    return false;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 034790fd9..bbd4bb129 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -746,6 +746,10 @@ public class HdfsPartition extends CatalogObjectImpl
   // -1 means there is no previous compaction event or compaction is not supported
   private final long lastCompactionId_;
 
+  // The last refresh event id of the partition
+  // -1 means there is no previous refresh event happened
+  private final long lastRefreshEventId_;
+
   /**
    * Constructor.  Needed for third party extensions that want to use their own builder
    * to construct the object.
@@ -765,7 +769,8 @@ public class HdfsPartition extends CatalogObjectImpl
         encodedInsertFileDescriptors, encodedDeleteFileDescriptors, location,
         isMarkedCached, accessLevel, hmsParameters, cachedMsPartitionDescriptor,
         partitionStats, hasIncrementalStats, numRows, writeId,
-        inFlightEvents, /*createEventId=*/-1L, /*lastCompactionId*/-1L);
+        inFlightEvents, /*createEventId=*/-1L, /*lastCompactionId*/-1L,
+        /*lastRefreshEventId*/-1L);
   }
 
   protected HdfsPartition(HdfsTable table, long id, long prevId, String partName,
@@ -777,7 +782,8 @@ public class HdfsPartition extends CatalogObjectImpl
       boolean isMarkedCached, TAccessLevel accessLevel, Map<String, String> hmsParameters,
       CachedHmsPartitionDescriptor cachedMsPartitionDescriptor,
       byte[] partitionStats, boolean hasIncrementalStats, long numRows, long writeId,
-      InFlightEvents inFlightEvents, long createEventId, long lastCompactionId) {
+      InFlightEvents inFlightEvents, long createEventId, long lastCompactionId,
+      long lastRefreshEventId) {
     table_ = table;
     id_ = id;
     prevId_ = prevId;
@@ -798,6 +804,7 @@ public class HdfsPartition extends CatalogObjectImpl
     inFlightEvents_ = inFlightEvents;
     createEventId_ = createEventId;
     lastCompactionId_ = lastCompactionId;
+    lastRefreshEventId_ = lastRefreshEventId;
     if (partName == null && id_ != CatalogObjectsConstants.PROTOTYPE_PARTITION_ID) {
       partName_ = FeCatalogUtils.getPartitionName(this);
     } else {
@@ -807,6 +814,8 @@ public class HdfsPartition extends CatalogObjectImpl
 
   public long getCreateEventId() { return createEventId_; }
 
+  public long getLastRefreshEventId() { return lastRefreshEventId_; }
+
   @Override // FeFsPartition
   public HdfsStorageDescriptor getInputFormatDescriptor() {
     return fileFormatDescriptor_;
@@ -1251,6 +1260,7 @@ public class HdfsPartition extends CatalogObjectImpl
     // is not active.
     private long createEventId_ = -1L;
     private long lastCompactionId_ = -1L;
+    private long lastRefreshEventId_ = -1L;
     private InFlightEvents inFlightEvents_ = new InFlightEvents();
 
     @Nullable
@@ -1318,7 +1328,7 @@ public class HdfsPartition extends CatalogObjectImpl
           encodedDeleteFileDescriptors_, location_, isMarkedCached_, accessLevel_,
           hmsParameters_, cachedMsPartitionDescriptor_, partitionStats_,
           hasIncrementalStats_, numRows_, writeId_, inFlightEvents_, createEventId_,
-          lastCompactionId_);
+          lastCompactionId_, lastRefreshEventId_);
     }
 
     public Builder setId(long id) {
@@ -1331,6 +1341,11 @@ public class HdfsPartition extends CatalogObjectImpl
       return this;
     }
 
+    public Builder setLastRefreshEventId(long eventId) {
+      lastRefreshEventId_ = eventId;
+      return this;
+    }
+
     public Builder setPrevId(long prevId) {
       prevId_ = prevId;
       return this;
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 538e813c6..da83a47ac 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -2876,6 +2876,13 @@ public class HdfsTable extends Table implements FeFsTable {
     FsPermissionCache permissionCache = new FsPermissionCache();
     Map<HdfsPartition.Builder, HdfsPartition> partBuilderToPartitions = new HashMap<>();
     Set<HdfsPartition.Builder> partBuildersFileMetadataRefresh = new HashSet<>();
+    long latestEventId = -1L;
+    try {
+      latestEventId = client.getCurrentNotificationEventId().getEventId();
+    } catch (TException exception) {
+      LOG.warn(String.format("Unable to fetch latest event id from HMS: %s",
+          exception.getMessage()));
+    }
     for (Map.Entry<Partition, HdfsPartition> entry : hmsPartsToHdfsParts.entrySet()) {
       Partition hmsPartition = entry.getKey();
       HdfsPartition oldPartition = entry.getValue();
@@ -2887,6 +2894,7 @@ public class HdfsTable extends Table implements FeFsTable {
       if (oldPartition != null) {
         partBuilder.setFileDescriptors(oldPartition);
       }
+      partBuilder.setLastRefreshEventId(latestEventId);
       switch (fileMetadataLoadOpts) {
         case FORCE_LOAD:
           partBuildersFileMetadataRefresh.add(partBuilder);
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index aaad451df..21b766ce8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -197,6 +197,8 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
   // not by reading this flag and without acquiring read lock on table object
   protected volatile long lastSyncedEventId_ = -1;
 
+  protected volatile long lastRefreshEventId_ = -1L;
+
   protected Table(org.apache.hadoop.hive.metastore.api.Table msTable, Db db,
       String name, String owner) {
     msTable_ = msTable;
@@ -1028,4 +1030,20 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
    * Clears the in-progress modifications in case of failures.
    */
   public void resetInProgressModification() { }
+
+  public long getLastRefreshEventId() { return lastRefreshEventId_; }
+
+  public void setLastRefreshEventId(long eventId) {
+    if (eventId > lastRefreshEventId_) {
+      lastRefreshEventId_ = eventId;
+    }
+    LOG.debug("last refreshed event id for table: {} set to: {}", getFullName(),
+        lastRefreshEventId_);
+    // TODO: Should we reset lastSyncedEvent Id if it is less than event Id?
+    // If we don't reset it - we may start syncing table from an event id which
+    // is less than refresh event id
+    if (lastSyncedEventId_ < eventId) {
+      setLastSyncedEventId(eventId);
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
index bdd97a3f6..6ccae1fe1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
@@ -153,6 +153,7 @@ public class TableLoader {
         MetastoreEventsProcessor.syncToLatestEventId(catalog_, table,
             catalog_.getEventFactoryForSyncToLatestEvent(), metrics_);
       }
+      table.setLastRefreshEventId(latestEventId);
     } catch (TableLoadingException e) {
       table = IncompleteTable.createFailedMetadataLoadTable(db, tblName, e);
     } catch (NoSuchObjectException e) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 78895d2b4..805f58093 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -2456,6 +2456,9 @@ public class MetastoreEvents {
 
     // if isRefresh_ is set to true then it is refresh query, else it is invalidate query
     private boolean isRefresh_;
+
+    private org.apache.impala.catalog.Table tbl_;
+
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory instead
      */
@@ -2471,6 +2474,7 @@ public class MetastoreEvents {
             updatedFields.get("table"));
         reloadPartition_ = (Partition)updatedFields.get("partition");
         isRefresh_ = (boolean)updatedFields.get("isRefresh");
+        tbl_ = catalog_.getTable(dbName_, tblName_);
       } catch (Exception e) {
         throw new MetastoreNotificationException(debugString("Unable to "
                 + "parse reload message"), e);
@@ -2494,7 +2498,7 @@ public class MetastoreEvents {
 
     @Override
     public void process() throws MetastoreNotificationException {
-      if (isSelfEvent()) {
+      if (isSelfEvent() || isOlderEvent()) {
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
             .inc(getNumberOfEvents());
         infoLog("Incremented events skipped counter to {}",
@@ -2514,6 +2518,19 @@ public class MetastoreEvents {
       }
     }
 
+    private boolean isOlderEvent() {
+      if (tbl_ instanceof IncompleteTable) {
+        return false;
+      }
+      // Always check the lastRefreshEventId on the table first for table level refresh
+      if (tbl_.getLastRefreshEventId() > getEventId() || (reloadPartition_ != null &&
+          catalog_.isPartitionLoadedAfterEvent(dbName_, tblName_, reloadPartition_,
+              getEventId()))) {
+        return true;
+      }
+      return false;
+    }
+
     /**
      * Process partition reload
      */
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index bc8f1af1a..3830ffa7e 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -6435,8 +6435,8 @@ public class CatalogOpExecutor {
       }
 
       if (BackendConfig.INSTANCE.enableReloadEvents()) {
-        // fire event for refresh event
-        fireReloadEventHelper(req, updatedThriftTable, tblName, tbl);
+        // fire event for refresh event and update the last refresh event id
+        fireReloadEventAndUpdateRefreshEventId(req, updatedThriftTable, tblName, tbl);
       }
 
       // Return the TCatalogObject in the result to indicate this request can be
@@ -6480,12 +6480,13 @@ public class CatalogOpExecutor {
   /**
    * Helper class for refresh event.
    * This class invokes metastore shim's fireReloadEvent to fire event to HMS
+   * and update the last refresh event id in the cache
    * @param req - request object for TResetMetadataRequest.
    * @param updatedThriftTable - updated thrift table after refresh query
    * @param tblName
    * @param tbl
    */
-  private void fireReloadEventHelper(TResetMetadataRequest req,
+  private void fireReloadEventAndUpdateRefreshEventId(TResetMetadataRequest req,
       TCatalogObject updatedThriftTable, TableName tblName, Table tbl) {
     List<String> partVals = null;
     if (req.isSetPartition_spec()) {
@@ -6500,19 +6501,30 @@ public class CatalogOpExecutor {
           catalog_.getCatalogServiceId());
       tableParams.put(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
           String.valueOf(newCatalogVersion));
-      MetastoreShim.fireReloadEventHelper(catalog_.getMetaStoreClient(),
-          req.isIs_refresh(), partVals, tblName.getDb(), tblName.getTbl(),
-          tableParams);
+      List<Long> eventIds = MetastoreShim.fireReloadEventHelper(
+          catalog_.getMetaStoreClient(), req.isIs_refresh(), partVals, tblName.getDb(),
+          tblName.getTbl(), tableParams);
       if (req.isIs_refresh()) {
         if (catalog_.tryLock(tbl, true, 600000)) {
           catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
+          if (!eventIds.isEmpty()) {
+            if (req.isSetPartition_spec()) {
+              HdfsPartition partition = ((HdfsTable) tbl)
+                  .getPartitionFromThriftPartitionSpec(req.getPartition_spec());
+              HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
+              partBuilder.setLastRefreshEventId(eventIds.get(0));
+              ((HdfsTable) tbl).updatePartition(partBuilder);
+            } else {
+              tbl.setLastRefreshEventId(eventIds.get(0));
+            }
+          }
         } else {
           LOG.warn(String.format("Couldn't obtain a version lock for the table: %s. " +
               "Self events may go undetected in that case",
               tbl.getName()));
         }
       }
-    } catch (TException e) {
+    } catch (TException | CatalogException e) {
       LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR,
           "fireReloadEvent") + e.getMessage());
     } finally {
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index 63385a325..95fb83e6c 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -18,6 +18,9 @@ from __future__ import print_function
 import logging
 import pytest
 
+
+from hive_metastore.ttypes import FireEventRequest
+from hive_metastore.ttypes import FireEventRequestData
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfFS
@@ -227,8 +230,10 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     self.__run_self_events_test(unique_database, True)
     self.__run_self_events_test(unique_database, False)
 
-  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1"
-                                                  " --enable_reload_events=true")
+  @CustomClusterTestSuite.with_args(
+      catalogd_args="--hms_event_polling_interval_s=5"
+                    " --enable_reload_events=true"
+                    " --enable_sync_to_latest_event_on_ddls=true")
   def test_refresh_invalidate_events(self, unique_database):
     """Test is to verify Impala-11808, refresh/invalidate commands should generate a
     Reload event in HMS and CatalogD's event processor should process this event.
@@ -265,6 +270,31 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     check_self_events("refresh {}.{} partition(year=2022)"
         .format(unique_database, test_reload_table))
     check_self_events("refresh {}.{}".format(unique_database, test_reload_table))
+    EventProcessorUtils.wait_for_event_processing(self)
+
+    # Test to verify if older events are being skipped in event processor
+    data = FireEventRequestData()
+    data.refreshEvent = True
+    req = FireEventRequest(True, data)
+    req.dbName = unique_database
+    req.tableName = test_reload_table
+    # table level reload events
+    tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
+    for i in range(10):
+      self.hive_client.fire_listener_event(req)
+    EventProcessorUtils.wait_for_event_processing(self)
+    tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
+    assert tbl_events_skipped_after > tbl_events_skipped_before
+    # partition level reload events
+    EventProcessorUtils.wait_for_event_processing(self)
+    part_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
+    req.partitionVals = ["2022"]
+    for i in range(10):
+      self.hive_client.fire_listener_event(req)
+    EventProcessorUtils.wait_for_event_processing(self)
+    part_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
+    assert part_events_skipped_after > part_events_skipped_before
+
 
   @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
   def test_commit_compaction_events(self, unique_database):