You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/07/26 23:58:59 UTC

[impala] 01/02: IMPALA-8489: Partitions created by RECOVER PARTITIONS fail to create insert events with IllegalStateException.

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2551a873a0ab5636b3693711de11e2a91474d86a
Author: Anurag Mantripragada <an...@gmail.com>
AuthorDate: Fri Jul 12 12:26:52 2019 -0700

    IMPALA-8489: Partitions created by RECOVER PARTITIONS fail to create
    insert events with IllegalStateException.
    
    createInsertEvents() uses partition ids to keep track of the files
    added to the partitions by the insert by finding the delta of files
    in a partition before and after load() call. However, if partitions
    are marked dirty (for eg.: partitions created by RECOVER PARTITIONS),
    load() will drop and re-create them which will change the partition
    ids. createInsertEvents() then cannot find these parittions and fails
    with exception.
    
    In this patch, partitions are tracked by partition names instead of
    partition ids so drop + reload will not affect the logic.
    
    Testing:
    1. Ran TestRecoverPartitions.test_post_invalidate() which was
       failing.
    2. Ran MetastoreEventProcessorTest FE tests.
    
    Change-Id: Idef7f6aadff2868047c861ebfcc05d65f080eab9
    Reviewed-on: http://gerrit.cloudera.org:8080/13860
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 .../java/org/apache/impala/catalog/HdfsTable.java  |  2 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 50 +++++++++++++++-------
 2 files changed, 36 insertions(+), 16 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 0255e21..99eb943 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1110,7 +1110,7 @@ public class HdfsTable extends Table implements FeFsTable {
    * Given a set of partition names, returns the corresponding HdfsPartition
    * objects.
    */
-  private List<HdfsPartition> getPartitionsForNames(Collection<String> partitionNames) {
+  public List<HdfsPartition> getPartitionsForNames(Collection<String> partitionNames) {
     List<HdfsPartition> parts = Lists.newArrayListWithCapacity(partitionNames.size());
     for (String partitionName: partitionNames) {
       String partName = DEFAULT_PARTITION_NAME;
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 0951349..cf0d0fc 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import java.util.stream.Collectors;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -3872,22 +3873,28 @@ public class CatalogOpExecutor {
       List<FeFsPartition> affectedExistingPartitions, boolean isInsertOverwrite) {
     if (!catalog_.isExternalEventProcessingEnabled() ||
         affectedExistingPartitions.size() == 0) return;
-    // Map of partition ids to file names of all existing partitions touched by the
+
+    // Map of partition names to file names of all existing partitions touched by the
     // insert.
-    Map<Long, Set<String>> partitionFilesMap = new HashMap<>();
+    Map<String, Set<String>> partitionFilesMapBeforeInsert = new HashMap<>();
     if (!isInsertOverwrite) {
-      for (FeFsPartition partition : affectedExistingPartitions) {
-        partitionFilesMap.put(partition.getId(),
-            ((HdfsPartition) partition).getFileNames());
-      }
+      partitionFilesMapBeforeInsert =
+          getPartitionNameFilesMap(affectedExistingPartitions);
     }
     // If table is partitioned, we add all existing partitions touched by this insert
-    // to the insert event. If it is not an insert overwrite operation, we find new
-    // files added by this insert.
+    // to the insert event.
     Collection<? extends FeFsPartition> partsPostInsert;
-    partsPostInsert = table.getNumClusteringCols() > 0 ?
-        ((FeFsTable)table).loadPartitions(partitionFilesMap.keySet()) :
-            FeCatalogUtils.loadAllPartitions((HdfsTable) table);
+    partsPostInsert =
+        ((HdfsTable) table).getPartitionsForNames(
+            partitionFilesMapBeforeInsert.keySet());
+
+    // If it is not an insert overwrite operation, we find new files added by this insert.
+    Map<String, Set<String>> partitionFilesMapPostInsert = new HashMap<>();
+    if (!isInsertOverwrite) {
+      partitionFilesMapPostInsert =
+          getPartitionNameFilesMap(partsPostInsert);
+    }
+
     for (FeFsPartition part : partsPostInsert) {
       // Find the delta of the files added by the insert if it is not an overwrite
       // operation. HMS fireListenerEvent() expects an empty list if no new files are
@@ -3895,14 +3902,17 @@ public class CatalogOpExecutor {
       Set<String> deltaFiles = new HashSet<>();
       List<String> partVals = null;
       if (!isInsertOverwrite) {
-        Set<String> filesPostInsert = ((HdfsPartition) part).getFileNames();
+        String partitionName = part.getPartitionName() + "/";
+        Set<String> filesPostInsert =
+            partitionFilesMapPostInsert.get(partitionName);
         if (table.getNumClusteringCols() > 0) {
-          Set<String> filesBeforeInsert = partitionFilesMap.get(part.getId());
+          Set<String> filesBeforeInsert =
+              partitionFilesMapBeforeInsert.get(partitionName);
           deltaFiles = Sets.difference(filesBeforeInsert, filesPostInsert);
           partVals = part.getPartitionValuesAsStrings(true);
         } else {
-          Map.Entry<Long, Set<String>> entry =
-              partitionFilesMap.entrySet().iterator().next();
+          Map.Entry<String, Set<String>> entry =
+              partitionFilesMapBeforeInsert.entrySet().iterator().next();
           deltaFiles = Sets.difference(entry.getValue(), filesPostInsert);
         }
         LOG.info("{} new files detected for table {} partition {}.",
@@ -3926,6 +3936,16 @@ public class CatalogOpExecutor {
   }
 
   /**
+   * Util method to return a map of partition names to list of files for that partition.
+   */
+  private static Map<String, Set<String>> getPartitionNameFilesMap(Collection<?
+      extends FeFsPartition> partitions) {
+        return partitions.stream().collect(
+            Collectors.toMap(p -> p.getPartitionName() + "/",
+                p -> ((HdfsPartition) p).getFileNames()));
+  }
+
+  /**
    * Returns an existing, loaded table from the Catalog. Throws an exception if any
    * of the following are true:
    * - The table does not exist