You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2018/07/24 18:01:46 UTC

[5/5] impala git commit: IMPALA-3040: Remove cache directives if a partition is dropped externally

IMPALA-3040: Remove cache directives if a partition is dropped externally

HdfsTable.dropPartition() doesn't uncache the partition right now. If
the partition is dropped from Hive and refreshed in Impala, the
partition will be removed from the catalog but the cache directive
remains. Because Impala directly uses HMS client to drop a
table/database, the cache directive won't be removed even if the table
is dropped in Impala, if the backgroud loading is run concurrenty with
the HMS client RPC call. This patch removes the cache directive in
dropPartition() if the partition is removed from HMS.

Change-Id: Id7701a499405e961456adea63f3592b43bd69170
Reviewed-on: http://gerrit.cloudera.org:8080/10792
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Tianyi Wang <tw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ac4acf1b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ac4acf1b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ac4acf1b

Branch: refs/heads/master
Commit: ac4acf1b77ccad95528741c255834d8ccdb84518
Parents: 8d7f638
Author: Tianyi Wang <ti...@apache.org>
Authored: Tue Jul 3 14:51:54 2018 -0700
Committer: Tianyi Wang <tw...@cloudera.com>
Committed: Tue Jul 24 17:59:41 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/HdfsTable.java    | 40 +++++++++++++++-----
 .../impala/service/CatalogOpExecutor.java       |  3 --
 tests/query_test/test_hdfs_caching.py           | 19 ++++++++++
 3 files changed, 49 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ac4acf1b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 957b1f9..b4bd707 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -59,6 +59,7 @@ import org.apache.impala.analysis.PartitionKeyValue;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.Reference;
@@ -1150,8 +1151,10 @@ public class HdfsTable extends Table implements FeFsTable {
   /**
    * Drops a partition and updates partition column statistics. Returns the
    * HdfsPartition that was dropped or null if the partition does not exist.
+   * If removeCacheDirective = true, any cache directive on the partition is removed.
    */
-  private HdfsPartition dropPartition(HdfsPartition partition) {
+  private HdfsPartition dropPartition(HdfsPartition partition,
+      boolean removeCacheDirective) {
     if (partition == null) return null;
     fileMetadataStats_.totalFileBytes -= partition.getSize();
     fileMetadataStats_.numFiles -= partition.getNumFileDescriptors();
@@ -1160,6 +1163,14 @@ public class HdfsTable extends Table implements FeFsTable {
     Long partitionId = partition.getId();
     partitionMap_.remove(partitionId);
     nameToPartitionMap_.remove(partition.getPartitionName());
+    if (removeCacheDirective && partition.isMarkedCached()) {
+      try {
+        HdfsCachingUtil.removePartitionCacheDirective(partition);
+      } catch (ImpalaException e) {
+        LOG.error("Unable to remove the cache directive on table " + getFullName() +
+            ", partition " + partition.getPartitionName() + ": ", e);
+      }
+    }
     if (!isStoredInImpaladCatalogCache()) return partition;
     for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
       ColumnStats stats = getColumns().get(i).getStats();
@@ -1185,20 +1196,29 @@ public class HdfsTable extends Table implements FeFsTable {
     return partition;
   }
 
+  private HdfsPartition dropPartition(HdfsPartition partition) {
+    return dropPartition(partition, true);
+  }
+
   /**
    * Drops the given partitions from this table. Cleans up its metadata from all the
    * mappings used to speed up partition pruning/lookup. Also updates partitions column
    * statistics. Returns the list of partitions that were dropped.
    */
-  public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions) {
+  public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions,
+      boolean removeCacheDirective) {
     ArrayList<HdfsPartition> droppedPartitions = Lists.newArrayList();
     for (HdfsPartition partition: partitions) {
-      HdfsPartition hdfsPartition = dropPartition(partition);
+      HdfsPartition hdfsPartition = dropPartition(partition, removeCacheDirective);
       if (hdfsPartition != null) droppedPartitions.add(hdfsPartition);
     }
     return droppedPartitions;
   }
 
+  public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions) {
+    return dropPartitions(partitions, true);
+  }
+
   /**
    * Update the prototype partition used when creating new partitions for
    * this table. New partitions will inherit storage properties from the
@@ -1360,16 +1380,15 @@ public class HdfsTable extends Table implements FeFsTable {
     Map<Path, List<HdfsPartition>> partitionsToUpdateFileMdByPath = Maps.newHashMap();
     // Partitions that need to be dropped and recreated from scratch
     List<HdfsPartition> dirtyPartitions = Lists.newArrayList();
-    // Partitions that need to be removed from this table. That includes dirty
-    // partitions as well as partitions that were removed from the Hive Metastore.
-    List<HdfsPartition> partitionsToRemove = Lists.newArrayList();
+    // Partitions removed from the Hive Metastore.
+    List<HdfsPartition> removedPartitions = Lists.newArrayList();
     // Identify dirty partitions that need to be loaded from the Hive Metastore and
     // partitions that no longer exist in the Hive Metastore.
     for (HdfsPartition partition: partitionMap_.values()) {
       // Remove partitions that don't exist in the Hive Metastore. These are partitions
       // that were removed from HMS using some external process, e.g. Hive.
       if (!msPartitionNames.contains(partition.getPartitionName())) {
-        partitionsToRemove.add(partition);
+        removedPartitions.add(partition);
       }
       if (partition.isDirty()) {
         // Dirty partitions are updated by removing them from table's partition
@@ -1391,8 +1410,9 @@ public class HdfsTable extends Table implements FeFsTable {
       Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor());
       partitionNames.add(partition.getPartitionName());
     }
-    partitionsToRemove.addAll(dirtyPartitions);
-    dropPartitions(partitionsToRemove);
+    dropPartitions(removedPartitions);
+    // dirtyPartitions are reloaded and hence cache directives are not dropped.
+    dropPartitions(dirtyPartitions, false);
     // Load dirty partitions from Hive Metastore
     loadPartitionsFromMetastore(dirtyPartitions, client);
 
@@ -2146,7 +2166,7 @@ public class HdfsTable extends Table implements FeFsTable {
     refreshPartitionFileMetadata(refreshedPartition);
     Preconditions.checkArgument(oldPartition == null
         || HdfsPartition.KV_COMPARATOR.compare(oldPartition, refreshedPartition) == 0);
-    dropPartition(oldPartition);
+    dropPartition(oldPartition, false);
     addPartition(refreshedPartition);
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/ac4acf1b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 39cc108..dc19e9f 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2270,9 +2270,6 @@ public class CatalogOpExecutor {
           msClient.getHiveClient().dropPartition(tableName.getDb(), tableName.getTbl(),
               part.getPartitionValuesAsStrings(true), dropOptions);
           ++numTargetedPartitions;
-          if (part.isMarkedCached()) {
-            HdfsCachingUtil.removePartitionCacheDirective(part);
-          }
         } catch (NoSuchObjectException e) {
           if (!ifExists) {
             throw new ImpalaRuntimeException(

http://git-wip-us.apache.org/repos/asf/impala/blob/ac4acf1b/tests/query_test/test_hdfs_caching.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_hdfs_caching.py b/tests/query_test/test_hdfs_caching.py
index f16a4a4..16940a0 100644
--- a/tests/query_test/test_hdfs_caching.py
+++ b/tests/query_test/test_hdfs_caching.py
@@ -274,6 +274,25 @@ class TestHdfsCachingDdl(ImpalaTestSuite):
     drop_cache_directives_for_path(
         "/test-warehouse/cachedb.db/cached_tbl_reload_part/j=2")
 
+  @pytest.mark.execute_serially
+  def test_external_drop(self):
+    """IMPALA-3040: Tests that dropping a partition in Hive leads to the removal of the
+       cache directive after a refresh statement in Impala."""
+    num_entries_pre = get_num_cache_requests()
+    self.client.execute("use cachedb")
+    self.client.execute("create table test_external_drop_tbl (i int) partitioned by "
+                        "(j int) cached in 'testPool'")
+    self.client.execute("insert into test_external_drop_tbl (i,j) select 1, 2")
+    # 1 directive for the table and 1 directive for the partition.
+    assert num_entries_pre + 2 == get_num_cache_requests()
+    self.hive_client.drop_partition("cachedb", "test_external_drop_tbl", ["2"], True)
+    self.client.execute("refresh test_external_drop_tbl")
+    # The directive on the partition is removed.
+    assert num_entries_pre + 1 == get_num_cache_requests()
+    self.client.execute("drop table test_external_drop_tbl")
+    # We want to see the number of cached entities return to the original count.
+    assert num_entries_pre == get_num_cache_requests()
+
 def drop_cache_directives_for_path(path):
   """Drop the cache directive for a given path"""
   rc, stdout, stderr = exec_process("hdfs cacheadmin -removeDirectives -path %s" % path)