You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/08/30 05:24:41 UTC

[impala] 02/02: IMPALA-11535: Skip older events in the event processor based on the latestRefreshEventID

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b718d63860356a04814e07d91711c3c748b3e769
Author: Sai Hemanth Gantasala <sa...@cloudera.com>
AuthorDate: Wed Jun 7 16:51:15 2023 -0700

    IMPALA-11535: Skip older events in the event processor based on the
    latestRefreshEventID
    
    Summary: If the table has been manually refreshed, all its events
    happen before the manual REFRESH can be skipped. This happens when
    catalogd is lagging behind in processing events. When processing an
    event, we can check whether there are manual REFRESH executed after
    its eventTime. In such case, we don't need to process the event to
    refresh anything. This helps catalogd to catch up HMS events quickly.
    
    Implementation details: Updated the lastRefreshEventId on the table or
    partition whenever there is table or partition level refresh/load.
    By comparing the lastRefreshEventId to current event id in the event
    processor the older events can be skipped.
    
    set enable_skipping_older_events to true to enable this optimization
    
    Testing:
    - Unit end-to-end test and unit test to test the functionality.
    
    Change-Id: Ic0dc5c7396d80616680d8a5805ce80db293b72e1
    Reviewed-on: http://gerrit.cloudera.org:8080/20022
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   | 10 ++++
 be/src/util/backend-gflag-util.cc                  |  2 +
 common/thrift/BackendGflags.thrift                 |  2 +
 .../java/org/apache/impala/catalog/HdfsTable.java  | 17 +++---
 .../main/java/org/apache/impala/catalog/Table.java |  2 +-
 .../impala/catalog/events/MetastoreEvents.java     | 57 ++++++++++++++++++-
 .../catalog/events/MetastoreEventsProcessor.java   | 11 ++++
 .../org/apache/impala/service/BackendConfig.java   |  9 +++
 .../apache/impala/service/CatalogOpExecutor.java   | 18 ++++++
 .../events/MetastoreEventsProcessorTest.java       | 44 +++++++++++++++
 tests/custom_cluster/test_events_custom_configs.py | 64 ++++++++++++++++++++++
 11 files changed, 227 insertions(+), 9 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 6d2b47c96..bcbd58e1e 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -138,6 +138,16 @@ DEFINE_string(file_metadata_reload_properties, "EXTERNAL, metadata_location,"
     "refresh file metadata when these properties are changed. To skip this optimization,"
     "set the value to empty string");
 
+DEFINE_bool(enable_skipping_older_events, false, "This configuration is used to skip any"
+    "older events in the event processor based on the lastRefreshEventId on the"
+    "database/table/partition in the cache. All the DML queries that change the metadata"
+    "in the catalogD's cache will update the lastRefreshEventId i.e.., fetch the latest"
+    "available event on HMS and set it on the object. In case the event processor is"
+    "lagging, the older events in event processor queue can be skipped by comparing the"
+    "current event id to that of lastRefreshEventId. The default is set to false to"
+    "disable the optimisation. Set this true to enable skipping the older events and"
+    "quickly catch with the events of HMS");
+
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_subscriber_port);
 DECLARE_int32(state_store_port);
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index c10f472e4..ae9d61ba9 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -108,6 +108,7 @@ DECLARE_int32(thrift_rpc_max_message_size);
 DECLARE_string(file_metadata_reload_properties);
 DECLARE_string(java_weigher);
 DECLARE_int32(iceberg_reload_new_files_threshold);
+DECLARE_bool(enable_skipping_older_events);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -423,6 +424,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_scan_range_cost_factor(FLAGS_scan_range_cost_factor);
   cfg.__set_use_jamm_weigher(FLAGS_java_weigher == "jamm");
   cfg.__set_iceberg_reload_new_files_threshold(FLAGS_iceberg_reload_new_files_threshold);
+  cfg.__set_enable_skipping_older_events(FLAGS_enable_skipping_older_events);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 6cee3ab7a..3943021df 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -260,4 +260,6 @@ struct TBackendGflags {
   114: required bool use_jamm_weigher
 
   115: required i32 iceberg_reload_new_files_threshold
+
+  116: required bool enable_skipping_older_events;
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 0d8b6ba80..70ede2d5e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -56,6 +56,7 @@ import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.analysis.PartitionKeyValue;
+import org.apache.impala.catalog.events.MetastoreEventsProcessor;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.iceberg.GroupedContentFiles;
@@ -1927,6 +1928,7 @@ public class HdfsTable extends Table implements FeFsTable {
       }
       partBuilders.add(partBuilder);
     }
+    long latestEventId = MetastoreEventsProcessor.getCurrentEventIdNoThrow(client);
     long fileMdLoadTime = loadFileMetadataForPartitions(client, partBuilders,
         /* isRefresh=*/false);
     for (HdfsPartition.Builder p : partBuilders) {
@@ -1935,7 +1937,12 @@ public class HdfsTable extends Table implements FeFsTable {
       } else {
         updatePartition(p);
       }
+      if (latestEventId > -1) {
+        p.setLastRefreshEventId(latestEventId);
+      }
     }
+    LOG.info("Setting the latest refresh event id to {} for the loaded partitions for "
+        + "the table {}", latestEventId, getFullName());
     return fileMdLoadTime;
   }
 
@@ -2895,13 +2902,7 @@ public class HdfsTable extends Table implements FeFsTable {
     FsPermissionCache permissionCache = new FsPermissionCache();
     Map<HdfsPartition.Builder, HdfsPartition> partBuilderToPartitions = new HashMap<>();
     Set<HdfsPartition.Builder> partBuildersFileMetadataRefresh = new HashSet<>();
-    long latestEventId = -1L;
-    try {
-      latestEventId = client.getCurrentNotificationEventId().getEventId();
-    } catch (TException exception) {
-      LOG.warn(String.format("Unable to fetch latest event id from HMS: %s",
-          exception.getMessage()));
-    }
+    long latestEventId = MetastoreEventsProcessor.getCurrentEventIdNoThrow(client);
     for (Map.Entry<Partition, HdfsPartition> entry : hmsPartsToHdfsParts.entrySet()) {
       Partition hmsPartition = entry.getKey();
       HdfsPartition oldPartition = entry.getValue();
@@ -2935,6 +2936,8 @@ public class HdfsTable extends Table implements FeFsTable {
       }
       partBuilderToPartitions.put(partBuilder, oldPartition);
     }
+    LOG.info("Setting the latest refresh event id to {} for the reloaded {} partitions",
+        latestEventId, partBuilderToPartitions.size());
     if (!partBuildersFileMetadataRefresh.isEmpty()) {
       LOG.info("for table {}, file metadataOps: {}, refreshing file metadata for {}"
               + " out of {} partitions to reload in reloadPartitions()", getFullName(),
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 2b70bdeeb..2364f82f9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -1048,7 +1048,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
     if (eventId > lastRefreshEventId_) {
       lastRefreshEventId_ = eventId;
     }
-    LOG.debug("last refreshed event id for table: {} set to: {}", getFullName(),
+    LOG.info("last refreshed event id for table: {} set to: {}", getFullName(),
         lastRefreshEventId_);
     // TODO: Should we reset lastSyncedEvent Id if it is less than event Id?
     // If we don't reset it - we may start syncing table from an event id which
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index e26acaff4..593a8de79 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1036,6 +1036,38 @@ public class MetastoreEvents {
       }
       return true;
     }
+
+    protected boolean isOlderEvent(Partition partitionEventObj) {
+      if (!BackendConfig.INSTANCE.enableSkippingOlderEvents()) {
+        return false;
+      }
+      org.apache.impala.catalog.Table tbl = null;
+      try {
+        tbl = catalog_.getTable(dbName_, tblName_);
+        if (tbl == null || tbl instanceof IncompleteTable) {
+          return false;
+        }
+        // Always check the lastRefreshEventId on the table first for table level refresh
+        if (tbl.getLastRefreshEventId() > getEventId() || (partitionEventObj != null &&
+            catalog_.isPartitionLoadedAfterEvent(dbName_, tblName_,
+                partitionEventObj, getEventId()))) {
+          metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+              .inc(getNumberOfEvents());
+          String messageStr = partitionEventObj == null ? "Skipping the event since the" +
+              " table " + dbName_+ "." + tblName_ + " has last refresh id as " +
+              tbl.getLastRefreshEventId() + ". Comparing it with current event " +
+              getEventId() + ". " : "";
+          infoLog("{}Incremented events skipped counter to {}", messageStr,
+              metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+                  .getCount());
+          return true;
+        }
+      } catch (CatalogException e) {
+        debugLog("ignoring exception while checking if it is an older event "
+            + "on table {}.{}", dbName_, tblName_, e);
+      }
+      return false;
+    }
   }
 
   /**
@@ -1293,7 +1325,13 @@ public class MetastoreEvents {
     @Override
     public void process() throws MetastoreNotificationException {
       if (isSelfEvent()) {
-        infoLog("Not processing the event as it is a self-event");
+        infoLog("Not processing the insert event as it is a self-event");
+        return;
+      }
+
+      if (isOlderEvent(insertPartition_)) {
+        infoLog("Not processing the insert event {} as it is an older event",
+            getEventId());
         return;
       }
       // Reload the whole table if it's a transactional table or materialized view.
@@ -1458,6 +1496,12 @@ public class MetastoreEvents {
         return;
       }
 
+      if (isOlderEvent(null)) {
+        infoLog("Not processing the alter table event {} as it is an older event",
+            getEventId());
+        return;
+      }
+
       // Determine whether this is an event which we have already seen or if it is a new
       // event
       if (isSelfEvent()) {
@@ -2153,6 +2197,12 @@ public class MetastoreEvents {
         return;
       }
 
+      if (isOlderEvent(partitionBefore_)) {
+        infoLog("Not processing the alter partition event {} as it is an older event",
+            getEventId());
+        return;
+      }
+
       // Ignore the event if this is a trivial event. See javadoc for
       // isTrivialAlterPartitionEvent() for examples.
       if (canBeSkipped()) {
@@ -2291,6 +2341,11 @@ public class MetastoreEvents {
       // isTrivialAlterPartitionEvent() for examples.
       List<T> eventsToProcess = new ArrayList<>();
       for (T event : batchedEvents_) {
+        if (isOlderEvent(event.getPartitionForBatching())) {
+          infoLog("Not processing the current event id {} as it is an older event",
+              event.getEventId());
+          continue;
+        }
         if (!event.canBeSkipped()) {
           eventsToProcess.add(event);
         }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 19b7ddc36..c392ef980 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -715,6 +715,17 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
     }
   }
 
+  public static long getCurrentEventIdNoThrow(IMetaStoreClient client) {
+    long latestEventId = -1L;
+    try {
+      latestEventId = client.getCurrentNotificationEventId().getEventId();
+    } catch (TException exception) {
+      LOG.warn(String.format("Unable to fetch latest event id from HMS: %s",
+          exception.getMessage()));
+    }
+    return latestEventId;
+  }
+
   /**
    * Starts the event processor from a given event id
    */
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 29352d908..f5c7570bf 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -365,6 +365,15 @@ public class BackendConfig {
     backendCfg_.enable_reload_events = flag;
   }
 
+  public boolean enableSkippingOlderEvents() {
+    return backendCfg_.enable_skipping_older_events;
+  }
+
+  @VisibleForTesting
+  public void setSkippingOlderEvents(boolean flag) {
+    backendCfg_.enable_skipping_older_events = flag;
+  }
+
   public boolean pullTableTypesAndComments() {
     return backendCfg_.pull_table_types_and_comments;
   }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 13d5e4161..256666d1b 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1551,9 +1551,14 @@ public class CatalogOpExecutor {
       String reason, @Nullable String debugAction)
       throws CatalogException {
     Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
+    long eventId = -1L;
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       org.apache.hadoop.hive.metastore.api.Table msTbl =
           getMetaStoreTable(msClient, tbl);
+      if (msTbl.getPartitionKeysSize() == 0) {
+        eventId = MetastoreEventsProcessor.getCurrentEventIdNoThrow(
+            msClient.getHiveClient());
+      }
       if (tbl instanceof HdfsTable) {
         ((HdfsTable) tbl).load(true, msClient.getHiveClient(), msTbl,
             reloadFileMetadata, reloadTableSchema, false, partitionsToUpdate,
@@ -1561,6 +1566,12 @@ public class CatalogOpExecutor {
       } else {
         tbl.load(true, msClient.getHiveClient(), msTbl, reason);
       }
+      // Update the lastRefreshEventId at the table level if it is unpartitioned table
+      // if it is partitioned table, partitions are updated in HdfsTable#load() method
+      if (msTbl.getPartitionKeysSize() == 0 && eventId > tbl.getLastRefreshEventId()
+          && reloadFileMetadata && reloadTableSchema) {
+        tbl.setLastRefreshEventId(eventId);
+      }
     }
     tbl.setCatalogVersion(newCatalogVersion);
   }
@@ -1635,11 +1646,18 @@ public class CatalogOpExecutor {
         LOG.trace(String.format("Altering view %s", tableName));
       }
       applyAlterTable(msTbl);
+      long eventId = -1L;
       try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+        eventId = MetastoreEventsProcessor.getCurrentEventIdNoThrow(
+            msClient.getHiveClient());
         tbl.load(true, msClient.getHiveClient(), msTbl, "ALTER VIEW");
       }
       addSummary(resp, "View has been altered.");
       tbl.setCatalogVersion(newCatalogVersion);
+      // Update the last refresh event id at table level
+      if (eventId > tbl.getLastRefreshEventId()) {
+        tbl.setLastRefreshEventId(eventId);
+      }
       addTableToCatalogUpdate(tbl, wantMinimalResult, resp.result);
     } finally {
       UnlockWriteLockIfErronouslyLocked();
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index e71e3afe6..bcf75c16d 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -3207,6 +3207,50 @@ public class MetastoreEventsProcessorTest {
         catalog_.getHdfsPartition(TEST_DB_NAME, testTblName, partKeyVals2));
   }
 
+  /**
+   * Test verifies if lastRefreshEventId is updated or not after a table is refreshed
+   * This is useful for skipping older events in the event processor
+   * @throws Exception
+   */
+  @Test
+  public void testSkippingOlderEvents() throws Exception {
+    BackendConfig.INSTANCE.setEnableSyncToLatestEventOnDdls(true);
+    BackendConfig.INSTANCE.setSkippingOlderEvents(true);
+    createDatabase(TEST_DB_NAME, null);
+    final String testTblName = "testSkippingOlderEvents";
+    createTable(testTblName, true);
+    eventsProcessor_.processEvents();
+    AlterTableExecutor hiveExecutor = new HiveAlterTableExecutor(TEST_DB_NAME,
+        testTblName);
+    hiveExecutor.execute();
+    HdfsTable testTbl = (HdfsTable)catalog_.getOrLoadTable(TEST_DB_NAME, testTblName,
+        "test", null);
+    long lastSyncEventIdBefore = testTbl.getLastRefreshEventId();
+    alterTableAddParameter(testTblName, "somekey", "someval");
+    eventsProcessor_.processEvents();
+    assertEquals(testTbl.getLastRefreshEventId(), eventsProcessor_.getCurrentEventId());
+    assertTrue(testTbl.getLastRefreshEventId() > lastSyncEventIdBefore);
+    confirmTableIsLoaded(TEST_DB_NAME, testTblName);
+    final String testUnpartTblName = "testUnPartSkippingOlderEvents";
+    createTable(testUnpartTblName, false);
+    testInsertEvents(TEST_DB_NAME, testUnpartTblName, false);
+    testTbl = (HdfsTable)catalog_.getOrLoadTable(TEST_DB_NAME, testUnpartTblName,
+        "test", null);
+    eventsProcessor_.processEvents();
+    assertEquals(testTbl.getLastRefreshEventId(), eventsProcessor_.getCurrentEventId());
+    confirmTableIsLoaded(TEST_DB_NAME, testUnpartTblName);
+    // Verify older HMS events are skipped by doing refresh in Impala
+    alterTableAddCol(testUnpartTblName, "newCol", "string", "test new column");
+    testTbl = (HdfsTable)catalog_.getOrLoadTable(TEST_DB_NAME, testUnpartTblName,
+        "test", null);
+    lastSyncEventIdBefore = testTbl.getLastRefreshEventId();
+    catalog_.reloadTable(testTbl, "test");
+    eventsProcessor_.processEvents();
+    assertEquals(testTbl.getLastRefreshEventId(), eventsProcessor_.getCurrentEventId());
+    assertTrue(testTbl.getLastRefreshEventId() > lastSyncEventIdBefore);
+    confirmTableIsLoaded(TEST_DB_NAME, testTblName);
+  }
+
   private void createDatabase(String catName, String dbName,
       Map<String, String> params) throws TException {
     try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index 43d77bac9..e37c45ae6 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -311,6 +311,70 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     EventProcessorUtils.wait_for_event_processing(self)
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--hms_event_polling_interval_s=5"
+                  " --enable_skipping_older_events=true"
+                  " --enable_sync_to_latest_event_on_ddls=true")
+  def test_skipping_older_events(self, unique_database):
+    """Test is to verify IMPALA-11535, event processor should ignore older events if the
+    current event id is older than the lastRefreshEventId on the table/partition
+    """
+    test_old_table = "test_old_table"
+
+    def verify_skipping_older_events(table_name, is_transactional, is_partitioned):
+      query = " ".join(["create", "transactional" if is_transactional else '',
+        "table {}.{} (i int)", "partitioned by (year int)" if is_partitioned else ''])
+      self.run_stmt_in_hive(query.format(unique_database, table_name))
+      values = "values (10),(20),(30)"
+      EventProcessorUtils.wait_for_event_processing(self)
+
+      def verify_skipping_hive_stmt_events(stmt, new_table_name):
+        tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
+        self.run_stmt_in_hive(stmt)
+        self.client.execute(
+          "refresh {}.{}".format(unique_database, new_table_name))
+        tables_refreshed_before = EventProcessorUtils.get_int_metric("tables-refreshed")
+        partitions_refreshed_before = \
+          EventProcessorUtils.get_int_metric("partitions-refreshed")
+        EventProcessorUtils.wait_for_event_processing(self)
+        tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
+        assert tbl_events_skipped_after > tbl_events_skipped_before
+        tables_refreshed_after = EventProcessorUtils.get_int_metric("tables-refreshed")
+        partitions_refreshed_after = \
+          EventProcessorUtils.get_int_metric("partitions-refreshed")
+        if is_partitioned:
+          assert partitions_refreshed_after == partitions_refreshed_before
+        else:
+          assert tables_refreshed_after == tables_refreshed_before
+
+      # test single insert event
+      query = " ".join(["insert into `{}`.`{}`", "partition (year=2023)"
+        if is_partitioned else '', values])
+      verify_skipping_hive_stmt_events(
+        query.format(unique_database, table_name), table_name)
+      # test batch insert events
+      query = " ".join(["insert into `{}`.`{}`", "partition (year=2023)"
+        if is_partitioned else '', values, ";"])
+      complete_query = ""
+      for _ in range(3):
+        complete_query += query.format(unique_database, table_name)
+      verify_skipping_hive_stmt_events(complete_query, table_name)
+      # Dynamic partitions test
+      query = " ".join(["create", "transactional" if is_transactional else '',
+        "table `{}`.`{}` (i int)",
+        "partitioned by (year int)" if is_partitioned else '', ";"])
+      complete_query = query.format(unique_database, "new_table")
+      complete_query += "insert overwrite table `{db}`.`{tbl1}` " \
+        "select * from `{db}`.`{tbl2}`"\
+        .format(db=unique_database, tbl1="new_table", tbl2=table_name)
+      verify_skipping_hive_stmt_events(complete_query, "new_table")
+      # Drop the tables before running another test
+      self.client.execute("drop table {}.{}".format(unique_database, table_name))
+      self.client.execute("drop table {}.{}".format(unique_database, "new_table"))
+    verify_skipping_older_events(test_old_table, False, False)
+    verify_skipping_older_events(test_old_table, True, False)
+    verify_skipping_older_events(test_old_table, False, True)
+    verify_skipping_older_events(test_old_table, True, True)
 
   @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
   def test_commit_compaction_events(self, unique_database):