You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/12/04 08:26:00 UTC

[impala] branch master updated: IMPALA-10886: Fix loss of createEventId for INSERT created partitions

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e2134e  IMPALA-10886: Fix loss of createEventId for INSERT created partitions
6e2134e is described below

commit 6e2134ebdf0771fff74e89bfd223ae24937cfadf
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Fri Dec 3 10:05:49 2021 +0800

    IMPALA-10886: Fix loss of createEventId for INSERT created partitions
    
    Coordinator calls updateCatalog RPC to catalogd to finalize the INSERT
    statement. Catalogd will create any new partitions and reload metadata
    of the updated partitions (including new partitions).
    
    After IMPALA-10502, each new partition has a createEventId which is used
    to detect how "fresh" it is. If the event processor receives a
    DROP_PARTITION event, it will compare its event id with the
    createEventId of the partition. Only DROP_PARTITION events happen after
    the createEventId will be evaluated.
    
    There is a bug in CatalogOpExecutor#updateCatalog() that we loss the
    createEventId of the new partitions. It should be used in the final call
    of loadTableMetadata(). This bug causes intermittent failures in
    TestReusePartitionMetadata.test_reuse_partition_meta. The last two DMLs
    of the test is dropping a partition and then creating it back by an
    INSERT. If the DROP_PARTITION event is processed after the INSERT
    finishes, the partition will be dropped incorrectly.
    
    Tests
     - Ran the test 100 times locally. Without the fix, it fails in 10 runs.
    
    Change-Id: I2622c28a5ce6084fc77f6ea475d2633445c7f8dd
    Reviewed-on: http://gerrit.cloudera.org:8080/18066
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
    Tested-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 .../java/org/apache/impala/service/CatalogOpExecutor.java |  4 ++--
 tests/custom_cluster/test_local_catalog.py                | 15 ++++++++-------
 2 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 860b5d7..ba13dbe 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -4082,7 +4082,7 @@ public class CatalogOpExecutor {
       } else {
         LOG.info(
             "EventId: {} Skipping removal of {}/{} partitions since they don't exist or"
-                + "were created later in table {}.", eventId, skippedPartitions.size(),
+                + " were created later in table {}.", eventId, skippedPartitions.size(),
             droppedPartitions.size(), table.getFullName());
       }
       List<List<TPartitionKeyValue>> allTPartKeyVals = Lists
@@ -6258,7 +6258,7 @@ public class CatalogOpExecutor {
       }
 
       loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata,
-          "INSERT");
+          partitionToEventId, "INSERT");
       addTableToCatalogUpdate(table, update.header.want_minimal_response,
           response.result);
     } finally {
diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py
index dcaa791..ca6affb 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -557,39 +557,40 @@ class TestReusePartitionMetadata(CustomClusterTestSuite):
     # Make sure the table is unloaded either in catalogd or coordinator.
     self.execute_query("invalidate metadata %s.alltypes" % unique_database)
     # First time: misses all(24) partitions.
-    self.check_missing_partitions(unique_database, 24)
+    self.check_missing_partitions(unique_database, 24, 24)
     # Second time: hits all(24) partitions.
-    self.check_missing_partitions(unique_database, 0)
+    self.check_missing_partitions(unique_database, 0, 24)
 
     # Alter comment on the table. Partition metadata should be reusable.
     self.execute_query(
         "comment on table %s.alltypes is null" % unique_database)
-    self.check_missing_partitions(unique_database, 0)
+    self.check_missing_partitions(unique_database, 0, 24)
 
     # Refresh one partition. Although table version bumps, metadata cache of other
     # partitions should be reusable.
     self.execute_query(
         "refresh %s.alltypes partition(year=2009, month=1)" % unique_database)
-    self.check_missing_partitions(unique_database, 1)
+    self.check_missing_partitions(unique_database, 1, 24)
 
     # Drop one partition. Although table version bumps, metadata cache of existing
     # partitions should be reusable.
     self.execute_query(
         "alter table %s.alltypes drop partition(year=2009, month=1)" % unique_database)
-    self.check_missing_partitions(unique_database, 0)
+    self.check_missing_partitions(unique_database, 0, 23)
 
     # Add back one partition. The partition meta is loaded in catalogd but not the
     # coordinator. So we still miss its meta. For other partitions, we can reuse them.
     self.execute_query(
         "insert into %s.alltypes partition(year=2009, month=1) "
         "select 0,true,0,0,0,0,0,0,'a','a',NULL" % unique_database)
-    self.check_missing_partitions(unique_database, 1)
+    self.check_missing_partitions(unique_database, 1, 24)
 
-  def check_missing_partitions(self, unique_database, partition_misses):
+  def check_missing_partitions(self, unique_database, partition_misses, total_partitions):
     """Helper method for checking number of missing partitions while selecting
      all partitions of the alltypes table"""
     ret = self.execute_query_expect_success(
         self.client, "explain select count(*) from %s.alltypes" % unique_database)
+    assert ("partitions=%d" % total_partitions) in ret.get_data()
     match = re.search(r"CatalogFetch.Partitions.Misses: (\d+)", ret.runtime_profile)
     assert len(match.groups()) == 1
     assert match.group(1) == str(partition_misses)