You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/25 15:45:27 UTC

[impala] 15/20: IMPALA-3040: Remove cache directives if a partition is dropped externally

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 45d2ce7d3b953fbc325608bc378ee38a1b506a49
Author: Tianyi Wang <ti...@apache.org>
AuthorDate: Tue Jul 3 14:51:54 2018 -0700

    IMPALA-3040: Remove cache directives if a partition is dropped externally
    
    HdfsTable.dropPartition() doesn't uncache the partition right now. If
    the partition is dropped from Hive and refreshed in Impala, the
    partition will be removed from the catalog but the cache directive
    remains. Because Impala directly uses HMS client to drop a
    table/database, the cache directive won't be removed even if the table
    is dropped in Impala, if the backgroud loading is run concurrenty with
    the HMS client RPC call. This patch removes the cache directive in
    dropPartition() if the partition is removed from HMS.
    
    Change-Id: Id7701a499405e961456adea63f3592b43bd69170
    Reviewed-on: http://gerrit.cloudera.org:8080/10792
    Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
    Tested-by: Tianyi Wang <tw...@cloudera.com>
---
 .../java/org/apache/impala/catalog/HdfsTable.java  | 40 ++++++++++++++++------
 .../apache/impala/service/CatalogOpExecutor.java   |  3 --
 tests/query_test/test_hdfs_caching.py              | 19 ++++++++++
 3 files changed, 49 insertions(+), 13 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 5f2fc05..0e472a3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -59,6 +59,7 @@ import org.apache.impala.analysis.PartitionKeyValue;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.Reference;
@@ -1149,8 +1150,10 @@ public class HdfsTable extends Table implements FeFsTable {
   /**
    * Drops a partition and updates partition column statistics. Returns the
    * HdfsPartition that was dropped or null if the partition does not exist.
+   * If removeCacheDirective = true, any cache directive on the partition is removed.
    */
-  private HdfsPartition dropPartition(HdfsPartition partition) {
+  private HdfsPartition dropPartition(HdfsPartition partition,
+      boolean removeCacheDirective) {
     if (partition == null) return null;
     fileMetadataStats_.totalFileBytes -= partition.getSize();
     fileMetadataStats_.numFiles -= partition.getNumFileDescriptors();
@@ -1159,6 +1162,14 @@ public class HdfsTable extends Table implements FeFsTable {
     Long partitionId = partition.getId();
     partitionMap_.remove(partitionId);
     nameToPartitionMap_.remove(partition.getPartitionName());
+    if (removeCacheDirective && partition.isMarkedCached()) {
+      try {
+        HdfsCachingUtil.removePartitionCacheDirective(partition);
+      } catch (ImpalaException e) {
+        LOG.error("Unable to remove the cache directive on table " + getFullName() +
+            ", partition " + partition.getPartitionName() + ": ", e);
+      }
+    }
     if (!isStoredInImpaladCatalogCache()) return partition;
     for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
       ColumnStats stats = getColumns().get(i).getStats();
@@ -1184,20 +1195,29 @@ public class HdfsTable extends Table implements FeFsTable {
     return partition;
   }
 
+  private HdfsPartition dropPartition(HdfsPartition partition) {
+    return dropPartition(partition, true);
+  }
+
   /**
    * Drops the given partitions from this table. Cleans up its metadata from all the
    * mappings used to speed up partition pruning/lookup. Also updates partitions column
    * statistics. Returns the list of partitions that were dropped.
    */
-  public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions) {
+  public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions,
+      boolean removeCacheDirective) {
     ArrayList<HdfsPartition> droppedPartitions = Lists.newArrayList();
     for (HdfsPartition partition: partitions) {
-      HdfsPartition hdfsPartition = dropPartition(partition);
+      HdfsPartition hdfsPartition = dropPartition(partition, removeCacheDirective);
       if (hdfsPartition != null) droppedPartitions.add(hdfsPartition);
     }
     return droppedPartitions;
   }
 
+  public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions) {
+    return dropPartitions(partitions, true);
+  }
+
   /**
    * Update the prototype partition used when creating new partitions for
    * this table. New partitions will inherit storage properties from the
@@ -1359,16 +1379,15 @@ public class HdfsTable extends Table implements FeFsTable {
     Map<Path, List<HdfsPartition>> partitionsToUpdateFileMdByPath = Maps.newHashMap();
     // Partitions that need to be dropped and recreated from scratch
     List<HdfsPartition> dirtyPartitions = Lists.newArrayList();
-    // Partitions that need to be removed from this table. That includes dirty
-    // partitions as well as partitions that were removed from the Hive Metastore.
-    List<HdfsPartition> partitionsToRemove = Lists.newArrayList();
+    // Partitions removed from the Hive Metastore.
+    List<HdfsPartition> removedPartitions = Lists.newArrayList();
     // Identify dirty partitions that need to be loaded from the Hive Metastore and
     // partitions that no longer exist in the Hive Metastore.
     for (HdfsPartition partition: partitionMap_.values()) {
       // Remove partitions that don't exist in the Hive Metastore. These are partitions
       // that were removed from HMS using some external process, e.g. Hive.
       if (!msPartitionNames.contains(partition.getPartitionName())) {
-        partitionsToRemove.add(partition);
+        removedPartitions.add(partition);
       }
       if (partition.isDirty()) {
         // Dirty partitions are updated by removing them from table's partition
@@ -1390,8 +1409,9 @@ public class HdfsTable extends Table implements FeFsTable {
       Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor());
       partitionNames.add(partition.getPartitionName());
     }
-    partitionsToRemove.addAll(dirtyPartitions);
-    dropPartitions(partitionsToRemove);
+    dropPartitions(removedPartitions);
+    // dirtyPartitions are reloaded and hence cache directives are not dropped.
+    dropPartitions(dirtyPartitions, false);
     // Load dirty partitions from Hive Metastore
     loadPartitionsFromMetastore(dirtyPartitions, client);
 
@@ -2145,7 +2165,7 @@ public class HdfsTable extends Table implements FeFsTable {
     refreshPartitionFileMetadata(refreshedPartition);
     Preconditions.checkArgument(oldPartition == null
         || HdfsPartition.KV_COMPARATOR.compare(oldPartition, refreshedPartition) == 0);
-    dropPartition(oldPartition);
+    dropPartition(oldPartition, false);
     addPartition(refreshedPartition);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 39cc108..dc19e9f 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2270,9 +2270,6 @@ public class CatalogOpExecutor {
           msClient.getHiveClient().dropPartition(tableName.getDb(), tableName.getTbl(),
               part.getPartitionValuesAsStrings(true), dropOptions);
           ++numTargetedPartitions;
-          if (part.isMarkedCached()) {
-            HdfsCachingUtil.removePartitionCacheDirective(part);
-          }
         } catch (NoSuchObjectException e) {
           if (!ifExists) {
             throw new ImpalaRuntimeException(
diff --git a/tests/query_test/test_hdfs_caching.py b/tests/query_test/test_hdfs_caching.py
index c013ed4..beedc7c 100644
--- a/tests/query_test/test_hdfs_caching.py
+++ b/tests/query_test/test_hdfs_caching.py
@@ -273,6 +273,25 @@ class TestHdfsCachingDdl(ImpalaTestSuite):
     drop_cache_directives_for_path(
         "/test-warehouse/cachedb.db/cached_tbl_reload_part/j=2")
 
+  @pytest.mark.execute_serially
+  def test_external_drop(self):
+    """IMPALA-3040: Tests that dropping a partition in Hive leads to the removal of the
+       cache directive after a refresh statement in Impala."""
+    num_entries_pre = get_num_cache_requests()
+    self.client.execute("use cachedb")
+    self.client.execute("create table test_external_drop_tbl (i int) partitioned by "
+                        "(j int) cached in 'testPool'")
+    self.client.execute("insert into test_external_drop_tbl (i,j) select 1, 2")
+    # 1 directive for the table and 1 directive for the partition.
+    assert num_entries_pre + 2 == get_num_cache_requests()
+    self.hive_client.drop_partition("cachedb", "test_external_drop_tbl", ["2"], True)
+    self.client.execute("refresh test_external_drop_tbl")
+    # The directive on the partition is removed.
+    assert num_entries_pre + 1 == get_num_cache_requests()
+    self.client.execute("drop table test_external_drop_tbl")
+    # We want to see the number of cached entities return to the original count.
+    assert num_entries_pre == get_num_cache_requests()
+
 def drop_cache_directives_for_path(path):
   """Drop the cache directive for a given path"""
   rc, stdout, stderr = exec_process("hdfs cacheadmin -removeDirectives -path %s" % path)