You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2020/10/07 14:07:07 UTC

[impala] 05/07: IMPALA-10189: don't reload partitions for drop stats

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d6f2dcab79bc80448f3241ddf24d3259aa3919cd
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Thu Sep 24 12:35:17 2020 -0700

    IMPALA-10189: don't reload partitions for drop stats
    
    This patch avoids reloading partition metadata from the
    HMS or filesystems when partition stats are changed,
    by a DROP STATS command.
    
    bulkAlterPartitions() is modified to allow callers to
    control whether partitions are marked dirty (and therefore
    reloaded).
    
    DDLs that do not mark partitions dirty need to correctly update
    the in-memory state of the Partition object so that it will
    be the same as it would be after a reload from the HMS.
    
    In this case, numRows_ was not previously updated, but now
    needs to be updated.
    
    Testing:
    * Ran exhaustive tests
    * Manually tested some scenarios with modifications to partitions
      do in Hive outside of Impala. The previous behaviour is preserved
      (i.e. cached partition state in Impala overwrites the Hive
      modifictions).
    * Manually tested dropping and computing regular and incremental
      stats and inspected the results in both Impala and Hive. We
      already have automated test coverage for these scenarios as well.
    
    Change-Id: I91009d6e9fdf25b3a3341fd1d29219519eb39a3d
    Reviewed-on: http://gerrit.cloudera.org:8080/16516
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/catalog/HdfsPartition.java   | 27 ++++++++
 .../java/org/apache/impala/catalog/HdfsTable.java  | 13 ++--
 .../apache/impala/service/CatalogOpExecutor.java   | 76 +++++++++++++---------
 3 files changed, 79 insertions(+), 37 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index f632a39..f64a1f0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -36,6 +36,7 @@ import javax.annotation.Nullable;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.LiteralExpr;
@@ -1289,10 +1290,36 @@ public class HdfsPartition extends CatalogObjectImpl
       return this;
     }
 
+    /**
+     * Set the number of rows for this partition. setRowCountParam() and
+     * removeRowCountParam() should generally be used instead of this, except
+     * for the single "partition" in an unpartitioned table.
+     */
     public Builder setNumRows(long numRows) {
       numRows_ = numRows;
       return this;
     }
+
+    /**
+     * Update the row count in the partitions parameters and update the numRows stat
+     * to match.
+     */
+    public Builder setRowCountParam(long numRows) {
+      numRows_ = numRows;
+      putToParameters(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
+      return this;
+    }
+
+    /**
+     * Remove the row count in the partitions parameters and update the numRows stat
+     * to match.
+     */
+    public Builder removeRowCountParam() {
+      numRows_ = -1;
+      getParameters().remove(StatsSetupConst.ROW_COUNT);
+      return this;
+    }
+
     public Builder dropPartitionStats() { return setPartitionStatsBytes(null, false); }
     public Builder setPartitionStatsBytes(byte[] partitionStats, boolean hasIncrStats) {
       if (hasIncrStats) Preconditions.checkNotNull(partitionStats);
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 0400d3a..c287af6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -33,6 +33,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -98,9 +99,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Clock;
+import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Timer;
-import com.codahale.metrics.Counter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -480,11 +481,12 @@ public class HdfsTable extends Table implements FeFsTable {
   }
 
   /**
-   * @return true if whether there are any in-progress modifications on metadata of this
-   * partition.
+   * Marks partitions dirty by registering the partition builder for its new instance.
    */
-  public boolean isDirtyPartition(HdfsPartition partition) {
-    return dirtyPartitions_.containsKey(partition.getId());
+  public void markDirtyPartitions(Collection<HdfsPartition.Builder> partBuilders) {
+    for (HdfsPartition.Builder b : partBuilders) {
+      markDirtyPartition(b);
+    }
   }
 
   /**
@@ -1380,6 +1382,7 @@ public class HdfsTable extends Table implements FeFsTable {
       // always be reloaded (ignore the loadPartitionFileMetadata flag).
       loadTimeForFileMdNs_ = loadUpdatedPartitions(updatedPartitions);
       Preconditions.checkState(!hasInProgressModification());
+
       Set<String> addedPartitions = new HashSet<>();
       loadTimeForFileMdNs_ += loadNewPartitions(partitionNames, addedPartitions);
       // If a list of modified partitions (old and new) is specified, don't reload file
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 0df927e..8ef85af 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -41,7 +41,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileSystem;
@@ -68,7 +68,6 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.TruncateTableRequest;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.impala.analysis.AlterTableSortByStmt;
@@ -173,7 +172,6 @@ import org.apache.impala.thrift.TGrantRevokePrivParams;
 import org.apache.impala.thrift.TGrantRevokeRoleParams;
 import org.apache.impala.thrift.THdfsCachingOp;
 import org.apache.impala.thrift.THdfsFileFormat;
-import org.apache.impala.thrift.TCopyTestCaseReq;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TPartitionDef;
 import org.apache.impala.thrift.TPartitionKeyValue;
@@ -1138,7 +1136,8 @@ public class CatalogOpExecutor {
     if (params.isSetPartition_stats() && table.getNumClusteringCols() > 0) {
       Preconditions.checkState(table instanceof HdfsTable);
       modifiedParts = updatePartitionStats(params, (HdfsTable) table);
-      bulkAlterPartitions(table, modifiedParts, tblTxn);
+      // TODO: IMPALA-10203: avoid reloading modified partitions when updating stats.
+      bulkAlterPartitions(table, modifiedParts, tblTxn, UpdatePartitionMethod.MARK_DIRTY);
     }
 
     if (params.isSetTable_stats()) {
@@ -1209,7 +1208,7 @@ public class CatalogOpExecutor {
       }
       HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
       PartitionStatsUtil.partStatsToPartition(partitionStats, partBuilder);
-      partBuilder.putToParameters(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
+      partBuilder.setRowCountParam(numRows);
       // HMS requires this param for stats changes to take effect.
       partBuilder.putToParameters(MetastoreShim.statsGeneratedViaStatsTaskParam());
       partBuilder.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
@@ -1642,22 +1641,22 @@ public class CatalogOpExecutor {
       HdfsPartition part = (HdfsPartition) fePart;
       HdfsPartition.Builder partBuilder = null;
       if (part.getPartitionStatsCompressed() != null) {
-        partBuilder = new HdfsPartition.Builder(part)
-            .dropPartitionStats();
+        partBuilder = new HdfsPartition.Builder(part).dropPartitionStats();
       }
 
-      // Remove the ROW_COUNT parameter if it has been set.
+      // Remove the ROW_COUNT parameter if it has been set and set numRows to reflect
+      // the change.
       if (part.getParameters().containsKey(StatsSetupConst.ROW_COUNT)) {
         if (partBuilder == null) {
           partBuilder = new HdfsPartition.Builder(part);
         }
-        partBuilder.getParameters().remove(StatsSetupConst.ROW_COUNT);
+        partBuilder.removeRowCountParam();
       }
 
       if (partBuilder != null) modifiedParts.add(partBuilder);
     }
 
-    bulkAlterPartitions(table, modifiedParts, null);
+    bulkAlterPartitions(table, modifiedParts, null, UpdatePartitionMethod.IN_PLACE);
     return modifiedParts.size();
   }
 
@@ -3325,11 +3324,10 @@ public class CatalogOpExecutor {
           ((HdfsTable) tbl).getPartitionsFromPartitionSet(partitionSet);
       List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
       for(HdfsPartition partition: partitions) {
-        modifiedParts.add(
-            new HdfsPartition.Builder(partition)
-                .setFileFormat(HdfsFileFormat.fromThrift(fileFormat)));
+        modifiedParts.add(new HdfsPartition.Builder(partition).setFileFormat(
+            HdfsFileFormat.fromThrift(fileFormat)));
       }
-      bulkAlterPartitions(tbl, modifiedParts, null);
+      bulkAlterPartitions(tbl, modifiedParts, null, UpdatePartitionMethod.MARK_DIRTY);
       numUpdatedPartitions.setRef((long) modifiedParts.size());
     }
     return reloadFileMetadata;
@@ -3367,7 +3365,7 @@ public class CatalogOpExecutor {
         HiveStorageDescriptorFactory.setSerdeInfo(rowFormat, partBuilder.getSerdeInfo());
         modifiedParts.add(partBuilder);
       }
-      bulkAlterPartitions(tbl, modifiedParts, null);
+      bulkAlterPartitions(tbl, modifiedParts, null, UpdatePartitionMethod.MARK_DIRTY);
       numUpdatedPartitions.setRef((long) modifiedParts.size());
     }
     return reloadFileMetadata;
@@ -3446,11 +3444,10 @@ public class CatalogOpExecutor {
         modifiedParts.add(partBuilder);
       }
       try {
-        bulkAlterPartitions(tbl, modifiedParts, null);
+        // Do not mark the partitions dirty here since it's done in finally clause.
+        bulkAlterPartitions(tbl, modifiedParts, null, UpdatePartitionMethod.NONE);
       } finally {
-        for (HdfsPartition.Builder modifiedPart : modifiedParts) {
-          ((HdfsTable) tbl).markDirtyPartition(modifiedPart);
-        }
+        ((HdfsTable) tbl).markDirtyPartitions(modifiedParts);
       }
       numUpdatedPartitions.setRef((long) modifiedParts.size());
     } else {
@@ -3677,11 +3674,10 @@ public class CatalogOpExecutor {
       }
     }
     try {
-      bulkAlterPartitions(tbl, modifiedParts, null);
+      // Do not mark the partitions dirty here since it's done in finally clause.
+      bulkAlterPartitions(tbl, modifiedParts, null, UpdatePartitionMethod.NONE);
     } finally {
-      for (HdfsPartition.Builder modifiedPart : modifiedParts) {
-        ((HdfsTable) tbl).markDirtyPartition(modifiedPart);
-      }
+      ((HdfsTable) tbl).markDirtyPartitions(modifiedParts);
     }
     numUpdatedPartitions.setRef((long) modifiedParts.size());
   }
@@ -4111,13 +4107,27 @@ public class CatalogOpExecutor {
     addSummary(resp, "Privilege(s) have been revoked.");
   }
 
+  private static enum UpdatePartitionMethod {
+    // Do not apply updates to the partition. The caller is responsible for updating
+    // the state of any modified partitions to reflect changes applied.
+    NONE,
+    // Update the state of the Partition objects in place in the catalog.
+    IN_PLACE,
+    // Mark the partition dirty so that it will be later reloaded from scratch when
+    // the table is reloaded.
+    MARK_DIRTY,
+  }
+  ;
+
   /**
-   * Alters partitions in batches of size 'MAX_PARTITION_UPDATES_PER_RPC'. This
-   * reduces the time spent in a single update and helps avoid metastore client
+   * Alters partitions in the HMS in batches of size 'MAX_PARTITION_UPDATES_PER_RPC'.
+   * This reduces the time spent in a single update and helps avoid metastore client
    * timeouts.
+   * @param updateMethod controls how the same updates are applied to 'tbl' to reflect
+   *                     the changes written to the HMS.
    */
   private void bulkAlterPartitions(Table tbl, List<HdfsPartition.Builder> modifiedParts,
-      TblTransaction tblTxn) throws ImpalaException {
+      TblTransaction tblTxn, UpdatePartitionMethod updateMethod) throws ImpalaException {
     // Map from msPartitions to the partition builders. Use IdentityHashMap since
     // modifications will change hash codes of msPartitions.
     Map<Partition, HdfsPartition.Builder> msPartitionToBuilders =
@@ -4147,14 +4157,16 @@ public class CatalogOpExecutor {
                 msPartitionsSubList);
           }
           // Mark the corresponding HdfsPartition objects as dirty
-          for (Partition msPartition: msPartitionsSubList) {
+          for (Partition msPartition : msPartitionsSubList) {
             HdfsPartition.Builder partBuilder = msPartitionToBuilders.get(msPartition);
             Preconditions.checkNotNull(partBuilder);
-            // TODO(IMPALA-9779): Should we always mark this as dirty? It will trigger
-            //  file meta reload for this partition. Consider remove this and mark the
-            //  "dirty" flag in callers. For those don't need to reload file meta, the
-            //  caller can build and replace the partition directly.
-            ((HdfsTable) tbl).markDirtyPartition(partBuilder);
+            // The partition either needs to be reloaded or updated in place to apply
+            // the modifications.
+            if (updateMethod == UpdatePartitionMethod.MARK_DIRTY) {
+              ((HdfsTable) tbl).markDirtyPartition(partBuilder);
+            } else if (updateMethod == UpdatePartitionMethod.IN_PLACE) {
+              ((HdfsTable) tbl).updatePartition(partBuilder);
+            }
             // If event processing is turned on add the version number from partition
             // parameters to the HdfsPartition's list of in-flight events.
             addToInflightVersionsOfPartition(msPartition.getParameters(), partBuilder);