You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/12/13 13:45:33 UTC

(impala) branch master updated: IMPALA-10987: Changing impala.disableHmsSync in Hive should not break event processing

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 3112a0c0d IMPALA-10987: Changing impala.disableHmsSync in Hive should not break event processing
3112a0c0d is described below

commit 3112a0c0d17e9d3d2d79bae6e5d0dc6b4cf15eb9
Author: Sai Hemanth Gantasala <sa...@cloudera.com>
AuthorDate: Thu Nov 2 15:56:18 2023 -0700

    IMPALA-10987: Changing impala.disableHmsSync in
    Hive should not break event processing
    
    Currently we require a global invalidate to reset the events processor
    if the events sync is re-enabled on a table from HMS. This patch
    eliminates the need to reset the catalog cache when events sync is
    re-enabled.
    
    Implementation details: when events sync is re-enabled on table via HMS
    1) If the table exists in Impala,
      a) We can just invalidate the table, if the current event is greater
    than the create event id of the table, so that it is reloaded the first
    time query accesses it.
      b) Otherwise we can just ignore the event.
    2) If the table doesn't exist in Impala, create a Incomplete table, if
    there is no entry in the event delete log for this table.
    
    Note: If the eventSync is disabled on a table, for all subsequent table
    events, ideally we should mark the table as stale if the table object
    is loaded, so that it is reloaded the next time query accesses it. But,
    since this approach has performance impact, the events will be ignored.
    
    Testing:
    1) manually verified few scenarios.
    2) Added test case for the above scenarios.
    
    Change-Id: I37055990be49e91462ebc98aa97009ca768a0072
    Reviewed-on: http://gerrit.cloudera.org:8080/20648
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/events/MetastoreEvents.java     |  72 +++++++++----
 .../events/MetastoreEventsProcessorTest.java       | 112 +++++++++++++--------
 tests/custom_cluster/test_events_custom_configs.py |  37 +++++++
 3 files changed, 162 insertions(+), 59 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 391325bc9..9d00f67c0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1053,6 +1053,16 @@ public class MetastoreEvents {
         if (tbl == null || tbl instanceof IncompleteTable) {
           return false;
         }
+        if (getEventId() > 0 && getEventId() <= tbl.getCreateEventId()) {
+          // Older event, so this event will be skipped.
+          metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+          infoLog("Table: {} createEventId: {} is >= to the current " +
+              "eventId: {}. Incremented skipped metric to {}", tbl.getFullName(),
+              tbl.getCreateEventId(), getEventId(),
+              metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+                  .getCount());
+          return true;
+        }
         // Always check the lastRefreshEventId on the table first for table level refresh
         if (tbl.getLastRefreshEventId() > getEventId() || (partitionEventObj != null &&
             catalog_.isPartitionLoadedAfterEvent(dbName_, tblName_,
@@ -1522,26 +1532,15 @@ public class MetastoreEvents {
         return;
       }
       skipFileMetadataReload_ = canSkipFileMetadataReload(tableBefore_, tableAfter_);
-      // in case of table level alters from external systems it is better to do a full
-      // refresh  eg. this could be due to as simple as adding a new parameter or a
-      // full blown adding or changing column type
-      // rename is already handled above
       long startNs = System.nanoTime();
-      if (!reloadTableFromCatalog("ALTER_TABLE", false)) {
-        if (wasEventSyncTurnedOn()) {
-          // we received this alter table event on a non-existing table. We also
-          // detect that event sync was turned on in this event. This may mean that
-          // the table creation was skipped earlier because event sync was turned off
-          // we don't really know how many of events we have skipped till now because
-          // the sync was disabled all this while before we receive such a event. We
-          // error on the side of caution by stopping the event processing and
-          // letting the user to issue a invalidate metadata to reset the state
-          throw new MetastoreNotificationNeedsInvalidateException(debugString(
-              "Detected that event sync was turned on for the table %s "
-                  + "and the table does not exist. Event processing cannot be "
-                  + "continued further. Issue a invalidate metadata command to reset "
-                  + "the event processing state", getFullyQualifiedTblName()));
-        }
+      if (wasEventSyncTurnedOn()) {
+        handleEventSyncTurnedOn();
+      } else {
+        // in case of table level alters from external systems it is better to do a full
+        // refresh, eg. this could be due to as simple as adding a new parameter or a
+        // full blown adding or changing column type
+        // rename is already handled above
+        reloadTableFromCatalog("ALTER_TABLE", false);
       }
       long durationNs = System.nanoTime() - startNs;
       // Log event details for those triggered slow reload.
@@ -1552,6 +1551,41 @@ public class MetastoreEvents {
       }
     }
 
+    private void handleEventSyncTurnedOn() throws DatabaseNotFoundException,
+        MetastoreNotificationNeedsInvalidateException {
+      // check if the table exists or not. 1) if the table doesn't exist create an
+      // incomplete instance of the table. 2) If the table exists, there can be two
+      // scenarios a) current table eventId is greater than table's createEventId,
+      // then we should mark the table as stale. b) current table eventId <= table's
+      // createEventId, then we should ignore the event as it is an older event.
+      org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, tblName_);
+      if (tbl == null) { // table doesn't exist. Go with option (1)
+        if (catalogOpExecutor_.addTableIfNotRemovedLater(getEventId(), msTbl_)) {
+          infoLog("Successfully added table {}", getFullyQualifiedTblName());
+          metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_ADDED).inc();
+        } else {
+          metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+          debugLog("Incremented skipped metric to " +
+              metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+                  .getCount());
+        }
+      } else if (tbl instanceof IncompleteTable) {
+        // No-Op
+      } else if (getEventId() > tbl.getCreateEventId()) {
+        catalog_.invalidateTable(tbl.getTableName().toThrift(),
+            new Reference<>(), new Reference<>());
+        LOG.info("Table " + tbl.getFullName() + " is invalidated from catalog cache" +
+            " since eventSync is turned on for this table.");
+      } else {
+        // Unknown state of metadata object, make event processor go into error state
+        throw new MetastoreNotificationNeedsInvalidateException(debugString(
+            "Detected that event sync was turned on for the table %s "
+                + "with createEventId %s. This event should have been skipped as stale "
+                + "event. Event processing cannot be continued further. Issue a "
+                + "invalidate metadata command to reset the event processing state",
+            getFullyQualifiedTblName(), tbl.getCreateEventId()));
+      }
+    }
 
     /**
      * This method checks if the reloading of file metadata can be skipped for an alter
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 1ee5da6c5..0ebe1528d 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -101,6 +101,7 @@ import org.apache.impala.catalog.FileMetadataLoadOpts;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.Reference;
 import org.apache.impala.common.TransactionException;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
@@ -1682,56 +1683,87 @@ public class MetastoreEventsProcessorTest {
   }
 
   /**
-   * Test exercises the error condition in event processing when a table creation is
-   * skipped because event processing is disabled for that table. But then user alters
-   * the flag to re-enable the event processing. Since the table doesn't exist in the
-   * catalog in the first place, event processing should stop and go into error state
+   * Test exercises the event processing condition when table level HMS sync is changed
+   * from HMS. Consider the scenario where table creation is skipped because event
+   * processing is disabled for that table. But then user alters the flag to re-enable
+   * the event processing. Since the table doesn't exist in the catalog in the first
+   * place, event processing should not stop and go into error state, instead an
+   * incomplete table object should be added in cache, so that any direct access to
+   * that object fetches latest snapshot from HMS. Also this test verifies that older
+   * event of disabling HMS sync flag will be ignored if there is a drop & re-create of
+   * table in Impala. This test also marks the table object as stale if disableSync is
+   * enabled at table level and given that table object is loaded in the cache.
    */
   @Test
-  public void testEventSyncFlagTurnedOnErrorCase()
-      throws TException, CatalogException {
+  public void testEventSyncFlagChanged()
+      throws ImpalaException, TException, CatalogException {
     // when the event sync flag is changed from true to false (or null), it is possible
-    // that the table is not existing in catalog anymore. Event processing should error
-    // out in such a case
+    // that the table is not existing in catalog anymore. Event processing should not
+    // error out in such a case
     Pair<String, String> trueToFalse = new Pair<>("true", "false");
     Pair<String, String> trueToUnset = new Pair<>("true", null);
     List<Pair<String, String>> tblFlagTransitions = Arrays.asList(trueToFalse,
         trueToUnset);
     List<String> dbFlagVals = Arrays.asList(null, "false");
-    final String testTblName = "testEventSyncFlagTurnedOnErrorCase";
-    for (String dbFlag : dbFlagVals) {
-      for (Pair<String, String> tblTransition : tblFlagTransitions) {
-        Map<String, String> dbParams = new HashMap<>(1);
-        if (dbFlag != null) {
-          dbParams.put(MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(), dbFlag);
-        }
-        Map<String, String> tblParams = new HashMap<>(1);
-        if (tblTransition.first != null) {
-          tblParams.put(MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(),
-              tblTransition.first);
-        }
-        createDatabase(TEST_DB_NAME, dbParams);
-        createTable(null, TEST_DB_NAME, testTblName, tblParams, false, null);
-        eventsProcessor_.processEvents();
-        // table creation is skipped since the flag says so
-        assertNull(catalog_.getTable(TEST_DB_NAME, testTblName));
-        // now turn on the flag
-        alterTableAddParameter(testTblName,
-            MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(),
-            tblTransition.second);
-        eventsProcessor_.processEvents();
-        // if sync to latest event id is enabled, then the event is skipped
-        // since table does not exist in cache
-        if (!BackendConfig.INSTANCE.enableSyncToLatestEventOnDdls()) {
-          assertEquals(EventProcessorStatus.NEEDS_INVALIDATE,
-              eventsProcessor_.getStatus());
+    List<Boolean> createDropFromImpala = Arrays.asList(true, false);
+    final String testTblName = "testEventSyncFlagChanged";
+    boolean prevFlagVal = BackendConfig.INSTANCE.enableSkippingOlderEvents();
+    try {
+      BackendConfig.INSTANCE.setSkippingOlderEvents(true);
+      for (String dbFlag : dbFlagVals) {
+        for (Pair<String, String> tblTransition : tblFlagTransitions) {
+          Map<String, String> dbParams = new HashMap<>(1);
+          if (dbFlag != null) {
+            dbParams.put(MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(),
+                dbFlag);
+          }
+          Map<String, String> tblParams = new HashMap<>(1);
+          if (tblTransition.first != null) {
+            tblParams.put(MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(),
+                tblTransition.first);
+          }
+          createDatabase(TEST_DB_NAME, dbParams);
+          createTable(null, TEST_DB_NAME, testTblName, tblParams, false, null);
+          eventsProcessor_.processEvents();
+          // table creation is skipped since the flag says so
+          assertNull(catalog_.getTable(TEST_DB_NAME, testTblName));
+          // now turn on the flag
+          for (boolean doCreateDrop : createDropFromImpala) {
+            alterTableAddParameter(testTblName,
+                MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(),
+                tblTransition.second);
+            if (doCreateDrop) {
+              catalog_.invalidateTable(new TTableName(TEST_DB_NAME, testTblName),
+                  new Reference<>(), new Reference<>());
+              dropTableFromImpala(TEST_DB_NAME, testTblName);
+              createTableFromImpala(TEST_DB_NAME, testTblName, null, false);
+              loadTable(TEST_DB_NAME, testTblName);
+            }
+            eventsProcessor_.processEvents();
+            assertEquals(EventProcessorStatus.ACTIVE,
+                eventsProcessor_.getStatus());
+            Table tbl = catalog_.getTable(TEST_DB_NAME, testTblName);
+            if (!doCreateDrop) {
+              assertTrue(tbl instanceof IncompleteTable);
+            } else {
+              // when the event sync is disabled on the table, make sure that any
+              // subsequent operations on the table don't mark the table as stale
+              // if the table object is loaded in the cache
+              assertTrue(tbl instanceof HdfsTable);
+              alterTableAddParameter(testTblName,
+                  MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(),
+                  tblTransition.first);
+              alterTableAddParameter(testTblName, "somekey", "someval");
+              eventsProcessor_.processEvents();
+              tbl = catalog_.getTable(TEST_DB_NAME, testTblName);
+              assertTrue(tbl instanceof HdfsTable);
+            }
+          }
+          dropDatabaseCascade(TEST_DB_NAME);
         }
-        // issue a catalog reset to make sure that table comes back again and event
-        // processing is active
-        catalog_.reset();
-        assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
-        dropDatabaseCascade(TEST_DB_NAME);
       }
+    } finally {
+      BackendConfig.INSTANCE.setSkippingOlderEvents(prevFlagVal);
     }
   }
 
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index fd69d8c91..1de5e06d1 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -571,6 +571,43 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     EventProcessorUtils.wait_for_event_processing(self)
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5")
+  def test_disable_hms_sync(self, unique_database):
+    """This test verifies that impala event processor is in active state after
+    processing an alter table event that re-enables hms sync"""
+    # test 1: re-enable disableHmsSync config at table level
+    test_table = "disable_hms_sync_table"
+    self.client.execute(
+      """create table {}.{} (i int) TBLPROPERTIES ('impala.disableHmsSync'='true')"""
+      .format(unique_database, test_table))
+    EventProcessorUtils.wait_for_event_processing(self)
+    prev_events_skipped = EventProcessorUtils.get_int_metric('events-skipped', 0)
+    self.run_stmt_in_hive(
+      """"ALTER TABLE {}.{} SET TBLPROPERTIES('somekey'='somevalue')"""
+      .format(unique_database, test_table))
+    self.client.execute(
+      """ALTER TABLE {}.{} SET TBLPROPERTIES ('impala.disableHmsSync'='false')"""
+      .format(unique_database, test_table))
+    EventProcessorUtils.wait_for_event_processing(self)
+    current_events_skipped = EventProcessorUtils.get_int_metric('events-skipped', 0)
+    assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
+    assert current_events_skipped >= prev_events_skipped + 1
+
+    # test 2: re-enabling disableHmsSync config on a table shouldn't put event processor
+    # in error state if the database is not loaded.
+    try:
+      test_db = "unloaded_db_sync"
+      self.run_stmt_in_hive("""create database {}""".format(test_db))
+      self.run_stmt_in_hive("""create table {}.{} (id int)
+        TBLPROPERTIES ('impala.disableHmsSync'='true')""".format(test_db, test_table))
+      self.run_stmt_in_hive(
+        """ALTER TABLE {}.{} SET TBLPROPERTIES ('impala.disableHmsSync'='false')"""
+        .format(test_db, test_table))
+      EventProcessorUtils.wait_for_event_processing(self)
+      assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
+    finally:
+      self.run_stmt_in_hive("""drop database {} cascade""".format(test_db))
+
   @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=10")
   def test_event_processor_dropped_partition(self, unique_database):
     """This test verifies that impala event processor is in active state after