You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jb...@apache.org on 2021/05/08 17:09:12 UTC

[impala] branch branch-4.0.0 updated: IMPALA-10692: Fix acid insert when event polling is enabled

This is an automated email from the ASF dual-hosted git repository.

jbapple pushed a commit to branch branch-4.0.0
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/branch-4.0.0 by this push:
     new 7f1a3ff  IMPALA-10692: Fix acid insert when event polling is enabled
7f1a3ff is described below

commit 7f1a3ff69b49331bf310d34e80dbdb6929833830
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Sat May 1 16:19:19 2021 +0200

    IMPALA-10692: Fix acid insert when event polling is enabled
    
    IMPALA-10656 broke inserts to acid tables when HMS event polling
    is enabled. The issue was that the new partitions created during
    insert were not added to the catalog table yet when createInsertEvents
    is called, as the table is reloaded only after firing the events and
    committing the transaction.
    
    The fix is to create the INSERT event based on the partition name
    and the fileset alone for new partitions. Already existing partitions
    need the Partition object as we add the event to the list of the
    partition's in-flight events to detect self-events, but luckily new
    partitions don't need self event-handling because:
    - new partitions fire events only if the table is ACID
    - ACID inserts don't fire any INSERT event visible to Impala, so
      it cannot cause an unnecessary metadata reload
    
    ACID inserts from Hive work differently, they always cause an
    ALTER_TABLE or ALTER_PARTITION event which are detected by Impala
    and lead to metadata reload. I think that this situation is hacky
    at best because these events come before COMMIT event (which is
    currently ignored by Impala), so Impala may reload the table too
    early (before the commit is finished).
    
    Testing:
    - added acid tables to TestEventProcessing.test_self_events
    
    Change-Id: I8c2d0702232538a746410539ad55f87b7fde57e7
    Reviewed-on: http://gerrit.cloudera.org:8080/17380
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Csaba Ringhofer <cs...@cloudera.com>
---
 .../apache/impala/service/CatalogOpExecutor.java   | 61 +++++++++++++++-------
 tests/custom_cluster/test_event_processing.py      | 52 ++++++++++++++----
 2 files changed, 85 insertions(+), 28 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 6c22454..253102b 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -4669,8 +4669,9 @@ public class CatalogOpExecutor {
           Lists.newArrayList();
       // Names of the partitions that are added with add_partitions() RPC.
       // add_partitions() fires events for these partitions, so we don't need to
-      // fire an insert event.
-      Set<String> addedPartitionNames = new HashSet<>();
+      // fire an insert event. Collect the partition name both as a single string and
+      // as a list of values for convenience.
+      Map<String, List<String>> addedPartitionNames = new HashMap<>();
       addCatalogServiceIdentifiers(table, catalog_.getCatalogServiceId(),
           newCatalogVersion);
       if (table.getNumClusteringCols() > 0) {
@@ -4742,7 +4743,7 @@ public class CatalogOpExecutor {
             for (org.apache.hadoop.hive.metastore.api.Partition part: addedHmsParts) {
               String part_name =
                   FeCatalogUtils.getPartitionName((FeFsTable)table, part.getValues());
-              addedPartitionNames.add(part_name);
+              addedPartitionNames.put(part_name, part.getValues());
             }
             if (addedHmsParts.size() > 0) {
               if (cachePoolName != null) {
@@ -4886,7 +4887,8 @@ public class CatalogOpExecutor {
    * @param tblTxn Contains the transactionId and the writeId for the insert.
    */
   private void createInsertEvents(FeFsTable table,
-      Map<String, TUpdatedPartition> updatedPartitions, Set<String> addedPartitionNames,
+      Map<String, TUpdatedPartition> updatedPartitions,
+      Map<String, List<String>> addedPartitionNames,
       boolean isInsertOverwrite, TblTransaction tblTxn) throws CatalogException {
     if (!shouldGenerateInsertEvents()) {
       return;
@@ -4907,18 +4909,13 @@ public class CatalogOpExecutor {
     boolean isPartitioned = table.getNumClusteringCols() > 0;
     // List of all insert events that we call HMS fireInsertEvent() on.
     List<InsertEventRequestData> insertEventReqDatas = new ArrayList<>();
-    // List of all partitions that we insert into
-    List<HdfsPartition> partitions = new ArrayList<>();
+    // List of all existing partitions that we insert into.
+    List<HdfsPartition> existingPartitions = new ArrayList<>();
     if (isPartitioned) {
-      Set<String> partSet = updatedPartitions.keySet();
-      if (!isTransactional) {
-        // add_partitions() RPC have already fired events for new partitions in the
-        // non-transactional case.
-        partSet = new HashSet<String>(partSet);
-        partSet.removeAll(addedPartitionNames);
-      }
+      Set<String> existingPartSet = new HashSet<String>(updatedPartitions.keySet());
+      existingPartSet.removeAll(addedPartitionNames.keySet());
       // Only HdfsTable can have partitions, Iceberg tables are treated as unpartitioned.
-      partitions = ((HdfsTable) table).getPartitionsForNames(partSet);
+      existingPartitions = ((HdfsTable) table).getPartitionsForNames(existingPartSet);
     } else {
       Preconditions.checkState(updatedPartitions.size() == 1);
       // Unpartitioned tables have a single partition with empty name,
@@ -4932,7 +4929,8 @@ public class CatalogOpExecutor {
           makeInsertEventData( table, partVals, newFiles, isInsertOverwrite));
     }
 
-    for (HdfsPartition part : partitions) {
+    // Create events for existing partitions in partitioned tables.
+    for (HdfsPartition part : existingPartitions) {
       List<String> newFiles = updatedPartitions.get(part.getPartitionName()).getFiles();
       List<String> partVals  = part.getPartitionValuesAsStrings(true);
       Preconditions.checkState(!newFiles.isEmpty() || isInsertOverwrite);
@@ -4943,19 +4941,46 @@ public class CatalogOpExecutor {
           makeInsertEventData(table, partVals, newFiles, isInsertOverwrite));
     }
 
-    if (insertEventReqDatas.isEmpty()) return;
+    // Create events for new partitions only in ACID tables.
+    if (isTransactional) {
+      for (Map.Entry<String, List<String>> part : addedPartitionNames.entrySet()) {
+        List<String> newFiles = updatedPartitions.get(part.getKey()).getFiles();
+        List<String> partVals  = part.getValue();
+        Preconditions.checkState(!newFiles.isEmpty() || isInsertOverwrite);
+        Preconditions.checkState(!partVals.isEmpty());
+        LOG.info(String.format("%s new files detected for table %s new partition %s",
+            newFiles.size(), table.getFullName(), part.getKey()));
+        insertEventReqDatas.add(
+            makeInsertEventData(table, partVals, newFiles, isInsertOverwrite));
+      }
+    }
+
+    if (insertEventReqDatas.isEmpty()) {
+      return;
+    }
 
     MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient();
     TableInsertEventInfo insertEventInfo = new TableInsertEventInfo(
         insertEventReqDatas, isTransactional, txnId, writeId);
     List<Long> eventIds = MetastoreShim.fireInsertEvents(metaStoreClient,
         insertEventInfo, table.getDb().getName(), table.getName());
+    if (isTransactional) {
+      // ACID inserts do not generate INSERT events as it is enough to listen to the
+      // COMMIT event fired by HMS. Impala ignores COMMIT events, so we don't
+      // have to worry about reloading as a result of this "self" event.
+      // Note that Hive inserts also lead to an ALTER event which is the actual event
+      // that causes Impala to reload the table.
+      Preconditions.checkState(eventIds.isEmpty());
+      return;
+    }
     if (!eventIds.isEmpty()) {
       if (!isPartitioned) { // insert into table
+        Preconditions.checkState(eventIds.size() == 1);
         catalog_.addVersionsForInflightEvents(true, (Table)table, eventIds.get(0));
       } else { // insert into partition
-        for (int par_idx = 0; par_idx < partitions.size(); par_idx++) {
-          partitions.get(par_idx).addToVersionsForInflightEvents(
+        Preconditions.checkState(existingPartitions.size() == eventIds.size());
+        for (int par_idx = 0; par_idx < existingPartitions.size(); par_idx++) {
+          existingPartitions.get(par_idx).addToVersionsForInflightEvents(
               true, eventIds.get(par_idx));
         }
       }
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index 499a688..9d862aa 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -433,6 +433,8 @@ class TestEventProcessing(CustomClusterTestSuite):
 
   def __get_impala_test_queries(self, db_name, recover_tbl_name):
     tbl_name = self.__get_random_name("tbl_")
+    acid_tbl_name = self.__get_random_name("acid_tbl_")
+    acid_no_part_tbl_name = self.__get_random_name("acid_no_part_tbl_")
     tbl2 = self.__get_random_name("tbl_")
     view_name = self.__get_random_name("view_")
     # create a empty table for both partitioned and unpartitioned case for testing insert
@@ -444,6 +446,7 @@ class TestEventProcessing(CustomClusterTestSuite):
     self.client.execute(
       "create table {0}.{1} (c1 int) partitioned by (part int)".format(db_name,
         empty_partitioned_tbl))
+    acid_props = self.__get_transactional_tblproperties(True)
     self_event_test_queries = {
       # Queries which will increment the self-events-skipped counter
       True: [
@@ -478,7 +481,16 @@ class TestEventProcessing(CustomClusterTestSuite):
             db_name, tbl2),
           # compute stats will generates ALTER_PARTITION
           "compute stats {0}.{1}".format(db_name, tbl2),
-          "alter table {0}.{1} recover partitions".format(db_name, recover_tbl_name)],
+          "alter table {0}.{1} recover partitions".format(db_name, recover_tbl_name),
+          # insert into a existing partition; generates INSERT self-event
+          "insert into table {0}.{1} partition "
+          "(year, month) select * from functional.alltypessmall where year=2009 "
+          "and month=1".format(db_name, tbl2),
+          # insert overwrite query from Impala also generates a INSERT self-event
+          "insert overwrite table {0}.{1} partition "
+          "(year, month) select * from functional.alltypessmall where year=2009 "
+          "and month=1".format(db_name, tbl2),
+          ],
       # Queries which will not increment the self-events-skipped counter
       False: [
           "create table {0}.{1} like functional.alltypessmall "
@@ -502,17 +514,37 @@ class TestEventProcessing(CustomClusterTestSuite):
             db_name, empty_unpartitioned_tbl),
           "insert overwrite {0}.{1} partition(part) select * from {0}.{1}".format(
             db_name, empty_partitioned_tbl),
+          # in case of ACID tables no INSERT event is generated as the COMMIT event
+          # contains the related data
+          "create table {0}.{1} (c1 int) {2}".format(db_name,
+            acid_no_part_tbl_name, acid_props),
+          "insert into table {0}.{1} values (1) ".format(db_name, acid_no_part_tbl_name),
+          "insert overwrite table {0}.{1} select * from {0}.{1}".format(
+            db_name, acid_no_part_tbl_name),
+          "truncate table {0}.{1}".format(db_name, acid_no_part_tbl_name),
+          # the table is empty so the following insert adds 0 rows
+          "insert overwrite table {0}.{1} select * from {0}.{1}".format(
+            db_name, acid_no_part_tbl_name),
+          "create table {0}.{1} (c1 int) partitioned by (part int) {2}".format(db_name,
+            acid_tbl_name, acid_props),
+          "insert into table {0}.{1} partition (part=1) "
+            "values (1) ".format(db_name, acid_tbl_name),
+          "insert into table {0}.{1} partition (part) select id, int_col "
+            "from functional.alltypestiny".format(db_name, acid_tbl_name),
+          # repeat the same insert, now it writes to existing partitions
+          "insert into table {0}.{1} partition (part) select id, int_col "
+            "from functional.alltypestiny".format(db_name, acid_tbl_name),
+          # following insert overwrite is used instead of truncate, because truncate
+          # leads to a non-self event that reloads the table
+          "insert overwrite table {0}.{1} partition (part) select id, int_col "
+            "from functional.alltypestiny where id=-1".format(db_name, acid_tbl_name),
+          # the table is empty so the following inserts add 0 rows
+          "insert overwrite table {0}.{1} partition (part) select id, int_col "
+            "from functional.alltypestiny".format(db_name, acid_tbl_name),
+          "insert overwrite {0}.{1} partition(part) select * from {0}.{1}".format(
+            db_name, acid_tbl_name),
       ]
     }
-    if HIVE_MAJOR_VERSION >= 3:
-      # insert into a existing partition; generates INSERT self-event
-      self_event_test_queries[True].append("insert into table {0}.{1} partition "
-          "(year, month) select * from functional.alltypessmall where year=2009 "
-          "and month=1".format(db_name, tbl2))
-      # insert overwrite query from Impala also generates a INSERT self-event
-      self_event_test_queries[True].append("insert overwrite table {0}.{1} partition "
-         "(year, month) select * from functional.alltypessmall where year=2009 "
-         "and month=1".format(db_name, tbl2))
     return self_event_test_queries
 
   def __get_hive_test_queries(self, db_name, recover_tbl_name):