You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/08/16 06:15:13 UTC

[impala] 01/03: IMPALA-12257: Fix NPE in createInsertEvents when partitions only exist in HMS

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7039e1ecc60bf75939803ca95414df113f68048f
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Sat Jul 1 06:15:23 2023 +0800

    IMPALA-12257: Fix NPE in createInsertEvents when partitions only exist in HMS
    
    When files have been generated for INSERT statements, coordinator
    invokes the updateCatalog RPC to let catalogd finalize the HMS changes.
    Catalogd will fire INSERT events for external tables at the end of this
    work. This helps other systems that consume HMS events (e.g. Hive
    replication or other Impala clusters) be notified for the changes. It
    should be best-effort that any errors in it should not fail the INSERT
    statement.
    
    We've seen a NullPointerException thrown in createInsertEvents() in the
    case that an updated partition doesn't exist in the metadata cache of
    catalogd but exist in HMS. Note that the partition list cached in
    catalogd could differ from the actual list in HMS, especially when HMS
    event processing is disabled. If an INSERT statement modifies a
    partition that exists in HMS but not yet synced to catalogd,
    createInsertEvents() will throw a NPE when finding such a partition in
    catalogd. This fixes the NPE by skipping this finding step and using the
    partition names directly.
    
    This patch also refactors createInsertEvents to make it shorter.
    
    Tests
     - Add e2e tests
    
    Change-Id: I7d77844e26283ecb16b3b3aeb9f634bb3113eacd
    Reviewed-on: http://gerrit.cloudera.org:8080/20148
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/catalog/HdfsTable.java  |  11 ++
 .../apache/impala/service/CatalogOpExecutor.java   | 164 ++++++++++++---------
 tests/custom_cluster/test_insert_behaviour.py      |  65 ++++++++
 3 files changed, 167 insertions(+), 73 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index dba18957b..0d8b6ba80 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1769,6 +1769,17 @@ public class HdfsTable extends Table implements FeFsTable {
     return parts;
   }
 
+  /**
+   * Tracks the in-flight INSERT event id in the partition.
+   * @return false if the partition doesn't exist. Otherwise returns true.
+   */
+  public boolean addInflightInsertEventToPartition(String partName, long eventId) {
+    HdfsPartition partition = nameToPartitionMap_.get(partName);
+    if (partition == null) return false;
+    partition.addToVersionsForInflightEvents(/*isInsertEvent*/true, eventId);
+    return true;
+  }
+
   private void setUnpartitionedTableStats(HdfsPartition.Builder partBuilder) {
     Preconditions.checkState(numClusteringCols_ == 0);
     // For unpartitioned tables set the numRows in its single partition
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index d115ea2d0..5d8391845 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -38,6 +38,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -71,6 +72,7 @@ import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -6833,9 +6835,13 @@ public class CatalogOpExecutor {
       }
 
       // Before commit fire insert events if external event processing is
-      // enabled.
-      createInsertEvents((FeFsTable)table, update.getUpdated_partitions(),
-          addedPartitionNames, update.is_overwrite, tblTxn);
+      // enabled. This is best-effort. Any errors in it should not fail the INSERT.
+      try {
+        createInsertEvents((FeFsTable) table, update.getUpdated_partitions(),
+            addedPartitionNames, update.is_overwrite, tblTxn);
+      } catch (Exception e) {
+        LOG.error("Failed to fire insert events for table {}", table.getFullName(), e);
+      }
 
       // Commit transactional inserts on success. We don't abort the transaction
       // here in case of failures, because the client, i.e. query coordinator, is
@@ -6886,10 +6892,7 @@ public class CatalogOpExecutor {
 
   /**
    * Populates insert event data and calls fireInsertEvents() if external event
-   * processing is enabled. This is no-op if event processing is disabled.
-   *   TODO: I am not sure that it is the right thing to connect event polling and
-   *         event sending to the same config. This means that turning off automatic
-   *         refresh will also break replication.
+   * processing is enabled.
    * This method is replicating what Hive does in case a table or partition is inserts
    * into. There are 2 cases:
    * 1. If the table is transactional, we should first generate ADD_PARTITION events
@@ -6911,81 +6914,57 @@ public class CatalogOpExecutor {
   private void createInsertEvents(FeFsTable table,
       Map<String, TUpdatedPartition> updatedPartitions,
       Map<String, List<String>> addedPartitionNames,
-      boolean isInsertOverwrite, TblTransaction tblTxn) throws CatalogException {
-    if (!shouldGenerateInsertEvents(table)) {
-      return;
-    }
+      boolean isInsertOverwrite, TblTransaction tblTxn)
+      throws CatalogException, MetaException {
+    if (!shouldGenerateInsertEvents(table)) return;
     long txnId = tblTxn == null ? -1 : tblTxn.txnId;
     long writeId = tblTxn == null ? -1: tblTxn.writeId;
     // If the table is transaction table we should generate a transactional
     // insert event type. This would show up in HMS as an ACID_WRITE event.
     boolean isTransactional = AcidUtils.isTransactionalTable(table.getMetaStoreTable()
         .getParameters());
-    Preconditions.checkState(!isTransactional || txnId > 0, String
-        .format("Invalid transaction id %s for generating insert events on table %s",
-            txnId, table.getFullName()));
-    Preconditions.checkState(!isTransactional || writeId > 0,
-        String.format("Invalid write id %s for generating insert events on table %s",
-            writeId, table.getFullName()));
+    if (isTransactional) {
+      Preconditions.checkState(txnId > 0, "Invalid transaction id %s for table %s",
+          txnId, table.getFullName());
+      Preconditions.checkState(writeId > 0, "Invalid write id %s for table %s",
+          writeId, table.getFullName());
+    }
 
     boolean isPartitioned = table.getNumClusteringCols() > 0;
     // List of all insert events that we call HMS fireInsertEvent() on.
     List<InsertEventRequestData> insertEventReqDatas = new ArrayList<>();
     // The partition val list corresponding to insertEventReqDatas for Apache Hive-3
     List<List<String>> insertEventPartVals = new ArrayList<>();
-    // List of all existing partitions that we insert into.
-    List<HdfsPartition> existingPartitions = new ArrayList<>();
+    // List of all existing partition names in HMS that we insert into. It's possible
+    // that the partition doesn't exist in catalogd's cache.
+    // Use LinkedHashSet to preserve the order.
+    LinkedHashSet<String> existingPartSetInHms = new LinkedHashSet<>();
     if (isPartitioned) {
-      Set<String> existingPartSet = new HashSet<String>(updatedPartitions.keySet());
-      existingPartSet.removeAll(addedPartitionNames.keySet());
-      // Only HdfsTable can have partitions, Iceberg tables are treated as unpartitioned.
-      existingPartitions = ((HdfsTable) table).getPartitionsForNames(existingPartSet);
+      existingPartSetInHms.addAll(updatedPartitions.keySet());
+      existingPartSetInHms.removeAll(addedPartitionNames.keySet());
+      // Create events for existing partitions in partitioned tables.
+      for (String partName : existingPartSetInHms) {
+        List<String> partVals = MetaStoreUtil.getPartValsFromName(
+            table.getMetaStoreTable(), partName);
+        prepareInsertEventData(table, partName, partVals, updatedPartitions,
+            isInsertOverwrite, isPartitioned, insertEventReqDatas, insertEventPartVals);
+      }
     } else {
       Preconditions.checkState(updatedPartitions.size() == 1);
-      // Unpartitioned tables have a single partition with empty name,
-      // see HdfsTable.DEFAULT_PARTITION_NAME.
-      List<String> newFiles = updatedPartitions.get("").getFiles();
-      List<String> partVals = new ArrayList<>();
-      LOG.info(String.format("%s new files detected for table %s", newFiles.size(),
-          table.getFullName()));
-      if (!newFiles.isEmpty() || isInsertOverwrite) {
-        insertEventReqDatas.add(
-            makeInsertEventData( table, partVals, newFiles, isInsertOverwrite));
-        insertEventPartVals.add(partVals);
-      }
-    }
-
-    // Create events for existing partitions in partitioned tables.
-    for (HdfsPartition part : existingPartitions) {
-      List<String> newFiles = updatedPartitions.get(part.getPartitionName()).getFiles();
-      List<String> partVals  = part.getPartitionValuesAsStrings(true);
-      Preconditions.checkState(!partVals.isEmpty());
-      if (!newFiles.isEmpty() || isInsertOverwrite) {
-        LOG.info(String.format("%s new files detected for table %s partition %s",
-            newFiles.size(), table.getFullName(), part.getPartitionName()));
-        insertEventReqDatas.add(
-            makeInsertEventData(table, partVals, newFiles, isInsertOverwrite));
-        insertEventPartVals.add(partVals);
-      }
+      prepareInsertEventData(table, HdfsTable.DEFAULT_PARTITION_NAME,
+          Collections.emptyList(), updatedPartitions, isInsertOverwrite, isPartitioned,
+          insertEventReqDatas, insertEventPartVals);
     }
 
     // Create events for new partitions only in ACID tables.
     if (isTransactional) {
       for (Map.Entry<String, List<String>> part : addedPartitionNames.entrySet()) {
-        List<String> newFiles = updatedPartitions.get(part.getKey()).getFiles();
-        List<String> partVals  = part.getValue();
-        Preconditions.checkState(!partVals.isEmpty());
-        LOG.info(String.format("%s new files detected for table %s new partition %s",
-            newFiles.size(), table.getFullName(), part.getKey()));
-        insertEventReqDatas.add(
-            makeInsertEventData(table, partVals, newFiles, isInsertOverwrite));
-        insertEventPartVals.add(partVals);
+        prepareInsertEventData(table, part.getKey(), part.getValue(), updatedPartitions,
+            isInsertOverwrite, isPartitioned, insertEventReqDatas, insertEventPartVals);
       }
     }
 
-    if (insertEventReqDatas.isEmpty()) {
-      return;
-    }
+    if (insertEventReqDatas.isEmpty()) return;
 
     MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient();
     TableInsertEventInfo insertEventInfo = new TableInsertEventInfo(
@@ -6993,23 +6972,62 @@ public class CatalogOpExecutor {
     List<Long> eventIds = MetastoreShim.fireInsertEvents(metaStoreClient,
         insertEventInfo, table.getDb().getName(), table.getName());
     if (isTransactional) {
-      // ACID inserts do not generate INSERT events as it is enough to listen to the
-      // COMMIT event fired by HMS. Impala ignores COMMIT events, so we don't
-      // have to worry about reloading as a result of this "self" event.
-      // Note that Hive inserts also lead to an ALTER event which is the actual event
-      // that causes Impala to reload the table.
+      // ACID inserts do not generate INSERT events so there is nothing we need to track
+      // here for self-event detection. Note that It is enough to listen to the commit
+      // events fired by HMS, i.e. ALLOC_WRITE_ID_EVENT and COMMIT_TXN events.
+      // For commit events triggered by Impala itself, it skips reloads for them based
+      // on the writeIds. Commit events triggered by external systems (e.g. Hive) will
+      // bring new writeIds which causes Impala to reload the table.
+      // See more in addCommittedWriteIdsAndReloadPartitionsIfExist().
       Preconditions.checkState(eventIds.isEmpty());
       return;
     }
-    if (!eventIds.isEmpty()) {
-      if (!isPartitioned) { // insert into table
-        Preconditions.checkState(eventIds.size() == 1);
-        catalog_.addVersionsForInflightEvents(true, (Table)table, eventIds.get(0));
-      } else { // insert into partition
-        Preconditions.checkState(existingPartitions.size() == eventIds.size());
-        for (int par_idx = 0; par_idx < existingPartitions.size(); par_idx++) {
-          existingPartitions.get(par_idx).addToVersionsForInflightEvents(
-              true, eventIds.get(par_idx));
+    trackInsertEvents((HdfsTable)table, eventIds, existingPartSetInHms);
+  }
+
+  /**
+   * Helper method to prepare InsertEventRequestData. Also collect the list of
+   * partition values.
+   */
+  private void prepareInsertEventData(FeFsTable table,
+      String partName, List<String> partVals,
+      Map<String, TUpdatedPartition> updatedPartitions,
+      boolean isInsertOverwrite, boolean isPartitioned,
+      List<InsertEventRequestData> insertEventReqDatas,
+      List<List<String>> insertEventPartVals) throws CatalogException {
+    List<String> newFiles = updatedPartitions.get(partName).getFiles();
+    if (!newFiles.isEmpty() || isInsertOverwrite) {
+      LOG.info("{} new files detected for table {}{}",
+          newFiles.size(), table.getFullName(),
+          isPartitioned ? " partition " + partName : "");
+      insertEventReqDatas.add(
+          makeInsertEventData(table, partVals, newFiles, isInsertOverwrite));
+      insertEventPartVals.add(partVals);
+    }
+  }
+
+  /**
+   * Keeps track of the self-generated INSERT events for self-event detection.
+   * Each item in 'eventIds' corresponds to the partition in 'existingPartsInHms'
+   * based on the iteration order.
+   * Note that this is not called for transactional tables since INSERT on them
+   * don't trigger INSERT events.
+   */
+  private void trackInsertEvents(HdfsTable table, List<Long> eventIds,
+      LinkedHashSet<String> existingPartsInHms) {
+    if (eventIds == null || eventIds.isEmpty()) return;
+    if (table.getNumClusteringCols() == 0) { // insert into table
+      Preconditions.checkState(eventIds.size() == 1);
+      catalog_.addVersionsForInflightEvents(true, table, eventIds.get(0));
+    } else { // insert into partition
+      Preconditions.checkState(existingPartsInHms.size() == eventIds.size());
+      int par_idx = 0;
+      for (String partName : existingPartsInHms) {
+        long eventId = eventIds.get(par_idx++);
+        if (!table.addInflightInsertEventToPartition(partName, eventId)) {
+          LOG.warn("INSERT event {} on partition {} of table {} are not tracked since " +
+              "it doesn't exist in catalogd cache", eventId, partName,
+              table.getFullName());
         }
       }
     }
diff --git a/tests/custom_cluster/test_insert_behaviour.py b/tests/custom_cluster/test_insert_behaviour.py
index 44076a2a6..f74ba80d2 100644
--- a/tests/custom_cluster/test_insert_behaviour.py
+++ b/tests/custom_cluster/test_insert_behaviour.py
@@ -20,6 +20,7 @@ import pytest
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIfFS, SkipIfLocal
+from tests.util.event_processor_utils import EventProcessorUtils
 from tests.util.filesystem_utils import IS_ISILON, WAREHOUSE
 from tests.util.hdfs_util import (
     HdfsConfig,
@@ -129,3 +130,67 @@ class TestInsertBehaviourCustomCluster(CustomClusterTestSuite):
       self._check_partition_perms("p1=1/p2=3/p3=4/", default_perms)
     finally:
        client.close()
+
+
+@SkipIfFS.hive
+class TestInsertUnSyncedPartition(CustomClusterTestSuite):
+
+  @classmethod
+  def setup_class(cls):
+    super(TestInsertUnSyncedPartition, cls).setup_class()
+
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=0")
+  def test_insert_unsynced_partition(self, unique_database):
+    """Regression test for IMPALA-12257. Tests with event-processing disabled so
+    catalogd can easily have unsynced partition with HMS."""
+    self._test_insert_on_unsynced_partition(unique_database, "part1", False, False)
+    self._test_insert_on_unsynced_partition(unique_database, "part2", False, True)
+    self._test_insert_on_unsynced_partition(unique_database, "txn_part1", True, False)
+    self._test_insert_on_unsynced_partition(unique_database, "txn_part2", True, True)
+
+  def _test_insert_on_unsynced_partition(self, db, tbl, is_transactional, is_overwrite):
+    tbl_name = db + "." + tbl
+    create_stmt = """
+        create table {0} (i int) partitioned by (p int)
+        stored as textfile""".format(tbl_name)
+    if is_transactional:
+      create_stmt += """ tblproperties(
+        'transactional'='true',
+        'transactional_properties'='insert_only')"""
+    self.client.execute(create_stmt)
+    # Run any query on the table to make it loaded in catalogd.
+    self.client.execute("describe " + tbl_name)
+    # Add the partition in Hive so catalogd is not aware of it.
+    self.run_stmt_in_hive("""
+        insert into {0} partition (p=0) values (0)""".format(tbl_name))
+    # Track the last event id so we can fetch the generated events
+    last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
+    # Insert the new partition in Impala.
+    self.client.execute("""
+        insert {0} {1} partition(p=0) values (1)
+        """.format("overwrite" if is_overwrite else "into", tbl_name))
+    events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id)
+    if is_transactional:
+      assert len(events) > 2
+      assert events[0].eventType == "OPEN_TXN"
+      assert events[1].eventType == "ALLOC_WRITE_ID_EVENT"
+      assert events[1].dbName == db
+      assert events[1].tableName == tbl
+      # There is an empty ADD_PARTITION event due to Impala invokes the add_partitions
+      # HMS API but no new partitions are really added. This might change in future Hive
+      # versions. Here we just verify whether the last event is COMMIT_TXN.
+      assert events[len(events) - 1].eventType == "COMMIT_TXN"
+    else:
+      assert len(events) > 0
+      last_event = events[len(events) - 1]
+      assert last_event.dbName == db
+      assert last_event.tableName == tbl
+      assert last_event.eventType == "INSERT"
+
+    res = self.client.execute("select * from " + tbl_name)
+    if is_overwrite:
+      assert res.data == ["1\t0"]
+    else:
+      assert "0\t0" in res.data
+      assert "1\t0" in res.data
+      assert len(res.data) == 2