You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/07/11 22:14:21 UTC

[impala] branch master updated: IMPALA-12256: Fix DDLs losing create event ids in reloading partitions

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new fed883379 IMPALA-12256: Fix DDLs losing create event ids in reloading partitions
fed883379 is described below

commit fed883379383b368bbce4f3b36b95e01a7378578
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Fri Jun 30 16:22:46 2023 +0800

    IMPALA-12256: Fix DDLs losing create event ids in reloading partitions
    
    When HMS event-processor is enabled, each partition will track the
    event id of the ADD_PARTITION event that corresponds to its creation.
    The create event id is used to skip stale DROP_PARTITION events that
    have lower event ids.
    
    However, the partition-level create event ids are not correctly retained
    during DDLs. DDLs that modify the partition metadata will reload the
    partitions. Since HdfsPartition objects are immutable, the reload is
    performed as replacing the old one with the new object created by
    HdfsPartition.Builder. The HdfsPartition.Builder is built from the old
    HdfsPartition object. But it doesn't copy the create event id. This
    patch fixes the issue, also adds logs for comparing partition event ids.
    
    Tests:
     - Add e2e tests to verify DDLs won't loss the partition-level create
       event ids, so stale ADD_PARTITION events can be skipped as expected.
    
    Change-Id: I052faa093bda69fb16db0d424c1478bba103dad9
    Reviewed-on: http://gerrit.cloudera.org:8080/20145
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/CatalogServiceCatalog.java      |  3 +
 .../org/apache/impala/catalog/HdfsPartition.java   |  3 +
 .../java/org/apache/impala/catalog/HdfsTable.java  |  2 +
 .../apache/impala/service/CatalogOpExecutor.java   | 17 +++---
 tests/custom_cluster/test_events_custom_configs.py | 68 ++++++++++++++++++++++
 5 files changed, 85 insertions(+), 8 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 9b404dc98..724b3bc94 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -3900,6 +3900,9 @@ public class CatalogServiceCatalog extends Catalog {
       Partition msPartition, long eventId) {
     try {
       HdfsPartition hdfsPartition = getHdfsPartition(dbName, tableName, msPartition);
+      LOG.info("Partition {} of table {}.{} has last refresh id as {}. " +
+          "Comparing it with {}.", hdfsPartition.getPartitionName(), dbName, tableName,
+          hdfsPartition.getLastRefreshEventId(), eventId);
       if (hdfsPartition.getLastRefreshEventId() > eventId) {
         return true;
       }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 855637982..8afd439cb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -1323,7 +1323,10 @@ public class HdfsPartition extends CatalogObjectImpl
       }
       // Take over the in-flight events
       inFlightEvents_ = partition.inFlightEvents_;
+      // Don't lose the event ids
+      createEventId_ = partition.createEventId_;
       lastCompactionId_ = partition.lastCompactionId_;
+      lastRefreshEventId_ = partition.lastRefreshEventId_;
       return this;
     }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 7037df6de..dba18957b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -2901,6 +2901,8 @@ public class HdfsTable extends Table implements FeFsTable {
           oldPartition.getPartitionValues(), partBuilder.getPartitionValues()) == 0);
       if (oldPartition != null) {
         partBuilder.setFileDescriptors(oldPartition);
+        partBuilder.setCreateEventId(oldPartition.getCreateEventId());
+        partBuilder.setLastCompactionId(oldPartition.getLastCompactionId());
       }
       partBuilder.setLastRefreshEventId(latestEventId);
       switch (fileMetadataLoadOpts) {
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index b4efe74ee..ce4cc6f24 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -4547,14 +4547,15 @@ public class CatalogOpExecutor {
       return false;
     }
     // if the partition has been created since the event was generated, skip
-    // dropping the event.
-    if (hdfsPartition.getCreateEventId() > eventId) {
-      LOG.info("Not dropping partition {} of table {} since it's create event id {} is "
-              + "higher than eventid {}", hdfsPartition.getPartitionName(),
-          hdfsTable.getFullName(), hdfsPartition.getCreateEventId(), eventId);
-      return false;
-    }
-    return true;
+    // the stale event.
+    boolean isStale = hdfsPartition.getCreateEventId() > eventId;
+    LOG.info("{} partition {} of table {} since it's create event id {} is {} than " +
+            "eventid {}",
+        isStale ? "Not dropping" : "Dropping",
+        hdfsPartition.getPartitionName(), hdfsTable.getFullName(),
+        hdfsPartition.getCreateEventId(), isStale ? "higher" : "not higher",
+        eventId);
+    return !isStale;
   }
 
   /**
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index 0e8a9cd28..43d77bac9 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -825,3 +825,71 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
       tblproperties = "tblproperties ('transactional'='true'," \
                       "'transactional_properties'='insert_only')"
     return tblproperties
+
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5")
+  def test_stale_drop_partition_events(self, unique_database):
+    """Regression Tests for IMPALA-12256. Verifies stale DROP_PARTITION events are
+    skipped even if they are processed late after some other DDLs. Uses a higher polling
+    interval to ensure late processing on the events"""
+    self.client.execute(
+        "create table %s.part(i int) partitioned by (p int) stored as textfile"
+        % unique_database)
+    self.client.execute(
+        "insert into %s.part partition (p=0) values (0)" % unique_database)
+    # These DDLs will reload the partition metadata. We will verify they don't lose
+    # the create event ids after the reload.
+    partition_ddls = [
+      "compute stats %s.part" % unique_database,
+      "compute incremental stats %s.part" % unique_database,
+      "compute incremental stats %s.part partition(p=0)" % unique_database,
+      "alter table %s.part partition(p=0) set row format"
+      "  delimited fields terminated by ','" % unique_database,
+      "alter table %s.part partition(p=0) set fileformat parquet" % unique_database,
+      "alter table %s.part partition(p=0) set location '/tmp'" % unique_database,
+      "alter table %s.part partition(p=0) set tblproperties('k'='v')" % unique_database,
+      "refresh %s.part partition(p=0)" % unique_database,
+      "refresh %s.part" % unique_database,
+    ]
+    # Wait until the events in preparing the table are consumed.
+    EventProcessorUtils.wait_for_event_processing(self)
+    parts_added_before = EventProcessorUtils.get_int_metric("partitions-added")
+    parts_refreshed_before = EventProcessorUtils.get_int_metric("partitions-refreshed")
+    parts_removed_before = EventProcessorUtils.get_int_metric("partitions-removed")
+    for ddl in partition_ddls:
+      events_skipped_before = EventProcessorUtils.get_int_metric("events-skipped")
+      # Drop-create the partition and then runs a DDL on it. A DROP_PARTITION and an
+      # ADD_PARTITION event will be generated and should be skipped. The 3rd DDL might
+      # generate an ALTER_PARTITION event but it should be skipped as self-event.
+      # Note that we don't perform self-event detection on ADD/DROP_PARTITION events.
+      # They are skipped based on the partition level create event ids. So we should see
+      # no partitions are added/removed/refreshed if we correctly track the create event
+      # id (saved by the 2nd DDL that creates the partition).
+      # For the DROP_PARTITION event, there are 3 cases:
+      # 1) The DROP_PARTITION event is processed before the INSERT statement.
+      #    It's skipped since the partition doesn't exist.
+      # 2) The DROP_PARTITION event is processed after the INSERT statement
+      #    and before the 3rd DDL. The INSERT statement creates the partition so saves
+      #    the create event id which is higher than the id of the DROP_PARTITION event.
+      #    Thus the DROP_PARTITION event is skipped.
+      # 3) The DROP_PARTITION event is processed after the 3rd DDL. The reload triggered
+      #    by the DDL should keep track of the create event id so the DROP_PARTITION event
+      #    can be skipped.
+      # This test sets hms_event_polling_interval_s to 5 which is long enough for the
+      # 3 DDLs to finish. So it's more likely the 3rd case would happen, which is the
+      # case of IMPALA-12256.
+      self.client.execute(
+          "alter table %s.part drop partition (p=0)" % unique_database)
+      self.client.execute(
+          "insert into %s.part partition(p=0) values (1),(2)" % unique_database)
+      self.client.execute(ddl)
+      EventProcessorUtils.wait_for_event_processing(self)
+      events_skipped_after = EventProcessorUtils.get_int_metric("events-skipped")
+      parts_added_after = EventProcessorUtils.get_int_metric("partitions-added")
+      parts_refreshed_after = EventProcessorUtils.get_int_metric("partitions-refreshed")
+      parts_removed_after = EventProcessorUtils.get_int_metric("partitions-removed")
+      # Event-processor should not update any partitions since all events should be
+      # skipped
+      assert parts_removed_before == parts_removed_after
+      assert parts_added_before == parts_added_after
+      assert parts_refreshed_before == parts_refreshed_after
+      assert events_skipped_after > events_skipped_before