You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/08/15 02:02:34 UTC

[impala] 02/02: IMPALA-4364: Query option to refresh updated HMS partitions

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cd52932125e5636ff154c3cdb6a740877b255998
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Tue Aug 4 12:27:24 2020 -0700

    IMPALA-4364: Query option to refresh updated HMS partitions
    
    This patch introduces a new boolean query option
    REFRESH_UPDATED_HMS_PARTITIONS. When this query option is set
    the refresh table command reloads the partitions which have been
    modified in HMS in addition to adding [removing] the new [removed]
    partitions.
    
    In order to do this the refresh table command needs to fetch all
    the partitions instead of the just the partition names which can
    cause the performance of refresh table to degrade when the query
    option is set. However for certain use-cases currently there is
    no way to detect changed partitions using refresh table command.
    For instance, if certain partition locations have been changed,
    a refresh table will not update those partitions.
    
    Testing:
    1. Added a new test which sets the query option and makes sure
    that the updated partitions from hive are reloaded after refresh
    table command.
    2. Ran exhaustive tests with the patch.
    
    Change-Id: I50e8680509f4eb0712e7bb3de44df5f2952179af
    Reviewed-on: http://gerrit.cloudera.org:8080/16308
    Reviewed-by: Quanlong Huang <hu...@gmail.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-options.cc                    |   4 +
 be/src/service/query-options.h                     |   4 +-
 common/thrift/CatalogService.thrift                |   4 +
 common/thrift/ImpalaInternalService.thrift         |   3 +
 common/thrift/ImpalaService.thrift                 |   6 +
 .../impala/catalog/CatalogServiceCatalog.java      |  20 +-
 .../org/apache/impala/catalog/HdfsPartition.java   |  36 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  | 403 ++++++++++++++++-----
 .../apache/impala/service/CatalogOpExecutor.java   |   5 +-
 .../java/org/apache/impala/service/Frontend.java   |   6 +-
 tests/metadata/test_reset_metadata.py              |  83 +++++
 11 files changed, 466 insertions(+), 108 deletions(-)

diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 33e6d70..0b3a728 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -934,6 +934,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_max_fs_writers(max_fs_writers);
         break;
       }
+      case TImpalaQueryOptions::REFRESH_UPDATED_HMS_PARTITIONS: {
+        query_options->__set_refresh_updated_hms_partitions(IsTrue(value));
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 82cc400..4d7bc60 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::MAX_FS_WRITERS + 1);\
+      TImpalaQueryOptions::REFRESH_UPDATED_HMS_PARTITIONS + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -207,6 +207,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       ENABLE_DISTINCT_SEMI_JOIN_OPTIMIZATION, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(sort_run_bytes_limit, SORT_RUN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(max_fs_writers, MAX_FS_WRITERS, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(refresh_updated_hms_partitions,\
+      REFRESH_UPDATED_HMS_PARTITIONS, TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 53c7bdd..de3cd4f 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -249,6 +249,10 @@ struct TResetMetadataRequest {
 
   // If set, refreshes authorization metadata.
   8: optional bool authorization
+
+  // If set, refreshes partition objects which are modified externally.
+  // Applicable only when refreshing the table.
+  9: optional bool refresh_updated_hms_partitions
 }
 
 // Response from TResetMetadataRequest
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 64b621a..2f0c52d 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -438,6 +438,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   108: optional i32 max_fs_writers = 0;
+
+  // See comment in ImpalaService.thrift
+  109: optional bool refresh_updated_hms_partitions = false;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 2a2ed97..1d5688c 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -551,6 +551,12 @@ enum TImpalaQueryOptions {
   // Sets an upper limit on the number of fs writer instances to be scheduled during
   // insert. Currently this limit only applies to HDFS inserts.
   MAX_FS_WRITERS = 107
+  // When this query option is set, a refresh table statement will detect existing
+  // partitions which have been changed in metastore and refresh them. By default, this
+  // option is disabled since there is additional performance hit to fetch all the
+  // partitions and detect if they are not same as ones in the catalogd. Currently, this
+  // option is only applicable for refresh table statement.
+  REFRESH_UPDATED_HMS_PARTITIONS = 108
 }
 
 // The summary of a DML statement.
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index c223247..f6cfeb0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -2225,13 +2225,24 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Wrapper around {@link #reloadTable(Table, boolean, String)} which passes false for
+   * {@code refreshUpdatedPartitions} argument.
+   */
+  public TCatalogObject reloadTable(Table tbl, String reason) throws CatalogException {
+    return reloadTable(tbl, false, reason);
+  }
+
+  /**
    * Reloads metadata for table 'tbl' which must not be an IncompleteTable. Updates the
    * table metadata in-place by calling load() on the given table. Returns the
    * TCatalogObject representing 'tbl'. Applies proper synchronization to protect the
    * metadata load from concurrent table modifications and assigns a new catalog version.
    * Throws a CatalogException if there is an error loading table metadata.
+   * If {@code refreshUpdatedParts} is true, the refresh logic detects updated
+   * partitions in metastore and reloads them too.
    */
-  public TCatalogObject reloadTable(Table tbl, String reason) throws CatalogException {
+  public TCatalogObject reloadTable(Table tbl, boolean refreshUpdatedParts, String reason)
+      throws CatalogException {
     LOG.info(String.format("Refreshing table metadata: %s", tbl.getFullName()));
     Preconditions.checkState(!(tbl instanceof IncompleteTable));
     String dbName = tbl.getDb().getName();
@@ -2253,7 +2264,12 @@ public class CatalogServiceCatalog extends Catalog {
           throw new TableLoadingException("Error loading metadata for table: " +
               dbName + "." + tblName, e);
         }
-        tbl.load(true, msClient.getHiveClient(), msTbl, reason);
+        if (tbl instanceof HdfsTable) {
+          ((HdfsTable) tbl)
+              .load(true, msClient.getHiveClient(), msTbl, refreshUpdatedParts, reason);
+        } else {
+          tbl.load(true, msClient.getHiveClient(), msTbl, reason);
+        }
       }
       tbl.setCatalogVersion(newCatalogVersion);
       LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 10eed06..c1fba61 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -36,6 +36,7 @@ import javax.annotation.Nullable;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.PartitionKeyValue;
@@ -906,11 +907,30 @@ public class HdfsPartition extends CatalogObjectImpl
    * metastore.
    */
   public org.apache.hadoop.hive.metastore.api.Partition toHmsPartition() {
+    StorageDescriptor storageDescriptor = getStorageDescriptor();
+    if (storageDescriptor == null) return null;
+    // Make a copy so that the callers can modify the parameters as needed.
+    Map<String, String> hmsParams = Maps.newHashMap(getParameters());
+    PartitionStatsUtil.partStatsToParams(this, hmsParams);
+    org.apache.hadoop.hive.metastore.api.Partition partition =
+        new org.apache.hadoop.hive.metastore.api.Partition(
+            getPartitionValuesAsStrings(true), getTable().getDb().getName(),
+            getTable().getName(), cachedMsPartitionDescriptor_.msCreateTime,
+            cachedMsPartitionDescriptor_.msLastAccessTime, storageDescriptor,
+            hmsParams);
+    return partition;
+  }
+
+  /**
+   * Gets the StorageDescriptor for this partition. Useful for sending RPCs to metastore
+   * and comparing against the partitions from metastore.
+   */
+  public StorageDescriptor getStorageDescriptor() {
     if (cachedMsPartitionDescriptor_ == null) return null;
     Preconditions.checkNotNull(table_.getNonPartitionFieldSchemas());
     // Update the serde library class based on the currently used file format.
-    org.apache.hadoop.hive.metastore.api.StorageDescriptor storageDescriptor =
-        new org.apache.hadoop.hive.metastore.api.StorageDescriptor(
+    StorageDescriptor storageDescriptor =
+        new StorageDescriptor(
           // Make a shallow copy of the field schemas instead of passing a reference to
           // the source list since it could potentially be modified once the current
           // thread is out of table lock scope.
@@ -924,17 +944,7 @@ public class HdfsPartition extends CatalogObjectImpl
             cachedMsPartitionDescriptor_.sdBucketCols,
             cachedMsPartitionDescriptor_.sdSortCols,
             cachedMsPartitionDescriptor_.sdParameters);
-    // Make a copy so that the callers do not need to delete the incremental stats
-    // strings from the hmsParams_ map later.
-    Map<String, String> hmsParams = Maps.newHashMap(getParameters());
-    PartitionStatsUtil.partStatsToParams(this, hmsParams);
-    org.apache.hadoop.hive.metastore.api.Partition partition =
-        new org.apache.hadoop.hive.metastore.api.Partition(
-            getPartitionValuesAsStrings(true), getTable().getDb().getName(),
-            getTable().getName(), cachedMsPartitionDescriptor_.msCreateTime,
-            cachedMsPartitionDescriptor_.msLastAccessTime, storageDescriptor,
-            hmsParams);
-    return partition;
+    return storageDescriptor;
   }
 
   public static HdfsPartition prototypePartition(
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 81b252c..9420779 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -33,7 +33,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -1050,7 +1049,17 @@ public class HdfsTable extends Table implements FeFsTable {
   public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
       throws TableLoadingException {
-    load(reuseMetadata, client, msTbl, true, true, null, reason);
+    load(reuseMetadata, client, msTbl, /* loadPartitionFileMetadata */
+        true, /* loadTableSchema*/true, false,
+        /* partitionsToUpdate*/null, reason);
+  }
+
+  public void load(boolean reuseMetadata, IMetaStoreClient client,
+      org.apache.hadoop.hive.metastore.api.Table msTbl, boolean refreshUpdatedPartitions,
+      String reason) throws TableLoadingException {
+    load(reuseMetadata, client, msTbl, /* loadPartitionFileMetadata */
+        true, /* loadTableSchema*/true, refreshUpdatedPartitions,
+        /* partitionsToUpdate*/null, reason);
   }
 
   /**
@@ -1080,7 +1089,8 @@ public class HdfsTable extends Table implements FeFsTable {
   public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl,
       boolean loadParitionFileMetadata, boolean loadTableSchema,
-      Set<String> partitionsToUpdate, String reason) throws TableLoadingException {
+      boolean refreshUpdatedPartitions, Set<String> partitionsToUpdate, String reason)
+      throws TableLoadingException {
     final Timer.Context context =
         getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
     String annotation = String.format("%s metadata for %s%s partition(s) of %s.%s (%s)",
@@ -1121,7 +1131,8 @@ public class HdfsTable extends Table implements FeFsTable {
             }
           } else {
             storageMetadataLoadTime_ += updatePartitionsFromHms(
-                client, partitionsToUpdate, loadParitionFileMetadata);
+                client, partitionsToUpdate, loadParitionFileMetadata,
+                refreshUpdatedPartitions);
           }
           LOG.info("Incrementally loaded table metadata for: " + getFullName());
         } else {
@@ -1257,80 +1268,311 @@ public class HdfsTable extends Table implements FeFsTable {
    * spent loading file metadata in nanoseconds.
    */
   private long updatePartitionsFromHms(IMetaStoreClient client,
-      Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata)
-      throws Exception {
-    long fileMdLoadTime = 0;
+      Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata,
+      boolean refreshUpdatedPartitions) throws Exception {
     if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + getFullName());
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     Preconditions.checkNotNull(msTbl);
     Preconditions.checkState(msTbl.getPartitionKeysSize() != 0);
     Preconditions.checkState(loadPartitionFileMetadata || partitionsToUpdate == null);
+    PartitionDeltaUpdater deltaUpdater =
+        refreshUpdatedPartitions ? new PartBasedDeltaUpdater(client,
+            loadPartitionFileMetadata, partitionsToUpdate)
+            : new PartNameBasedDeltaUpdater(client, loadPartitionFileMetadata,
+                partitionsToUpdate);
+    deltaUpdater.apply();
+    return deltaUpdater.loadTimeForFileMdNs_;
+  }
 
-    // Retrieve all the partition names from the Hive Metastore. We need this to
-    // identify the delta between partitions of the local HdfsTable and the table entry
-    // in the Hive Metastore. Note: This is a relatively "cheap" operation
-    // (~.3 secs for 30K partitions).
-    Set<String> msPartitionNames = new HashSet<>();
-    msPartitionNames.addAll(client.listPartitionNames(db_.getName(), name_, (short) -1));
-    // Names of loaded partitions in this table
-    Set<String> partitionNames = new HashSet<>();
-    // Partitions for which file metadata must be loaded
-    List<HdfsPartition.Builder> partitionsToLoadFiles = new ArrayList<>();
-    // Partitions that need to be dropped and recreated from scratch
-    List<HdfsPartition.Builder> dirtyPartitions = new ArrayList<>();
-    // Partitions removed from the Hive Metastore.
-    List<HdfsPartition> removedPartitions = new ArrayList<>();
-    // Identify dirty partitions that need to be loaded from the Hive Metastore and
-    // partitions that no longer exist in the Hive Metastore.
-    for (HdfsPartition partition: partitionMap_.values()) {
-      // Remove partitions that don't exist in the Hive Metastore. These are partitions
-      // that were removed from HMS using some external process, e.g. Hive.
-      if (!msPartitionNames.contains(partition.getPartitionName())) {
-        removedPartitions.add(partition);
-      } else if (isDirtyPartition(partition)) {
-        // The modification of dirty partitions have been started. Pick up the in-progress
-        // partition builders to finalize the modification.
-        dirtyPartitions.add(pickInprogressPartitionBuilder(partition));
-      } else if (partitionsToUpdate == null && loadPartitionFileMetadata) {
-        partitionsToLoadFiles.add(new HdfsPartition.Builder(partition));
+  /**
+   * Util class to compute the delta of partitions known to this table and the partitions
+   * in Hive Metastore. This is used to incrementally refresh the table. It identifies
+   * the partitions which are removed [added] from metastore and removes [adds] them.
+   * Additionally, it also updates partitions which are provided or found to be stale.
+   */
+  private abstract class PartitionDeltaUpdater {
+    // flag used to determine if the file-metadata needs to be reloaded for stale
+    // partitions
+    private final boolean loadFileMd_;
+    // total time taken to load file-metadata in nano-seconds.
+    private long loadTimeForFileMdNs_;
+    // metastore client used to fetch partition information from metastore.
+    protected final IMetaStoreClient client_;
+    // Nullable set of partition names which when set is used to force load partitions.
+    // if loadFileMd_ flag is set, files for these partitions will also be
+    // reloaded.
+    private final Set<String> partitionsToUpdate_;
+
+    PartitionDeltaUpdater(IMetaStoreClient client, boolean loadPartitionFileMetadata,
+        Set<String> partitionsToUpdate) {
+      this.client_ = client;
+      this.loadFileMd_ = loadPartitionFileMetadata;
+      this.partitionsToUpdate_ = partitionsToUpdate;
+    }
+
+    /**
+     * This method used to determine if the given HdfsPartition has been removed from
+     * hive metastore.
+     * @return true if partition does not exist in metastore, else false.
+     */
+    public abstract boolean isRemoved(HdfsPartition hdfsPartition);
+
+    /**
+     * Loads any partitions which are known to metastore but not provided in
+     * knownPartitions. All such new partitions will be added in the given
+     * {@code addedPartNames} set.
+     * @param knownPartitions Known set of partition names to this Table.
+     * @param addedPartNames Set of part names which is used to return the newly added
+     *                       partNames
+     * @return Time taken in nanoseconds for file-metadata loading for new partitions.
+     */
+    public abstract long loadNewPartitions(Set<String> knownPartitions,
+        Set<String> addedPartNames) throws Exception;
+
+    /**
+     * Gets a {@link HdfsPartition.Builder} to construct a updated HdfsPartition for
+     * the given partition.
+     */
+    public abstract HdfsPartition.Builder getUpdatedPartition(HdfsPartition partition)
+        throws Exception;
+
+    /**
+     * Loads both the HMS and file-metadata of the partitions provided by the given
+     * map of HdfsPartition.Builders.
+     * @param updatedPartitionBuilders The map of partition names and the corresponding
+     *                                 HdfsPartition.Builders which need to be loaded.
+     * @return Time taken to load file-metadata in nanoseconds.
+     */
+    public abstract long loadUpdatedPartitions(
+        Map<String, HdfsPartition.Builder> updatedPartitionBuilders) throws Exception;
+
+    /**
+     * This method applies the partition delta (create new, remove old, update stale)
+     * when compared to the current state of partitions in the metastore.
+     */
+    public void apply() throws Exception {
+      List<HdfsPartition> removedPartitions = new ArrayList<>();
+      Map<String, HdfsPartition.Builder> updatedPartitions = new HashMap<>();
+      List<HdfsPartition.Builder> partitionsToLoadFiles = new ArrayList<>();
+      Set<String> partitionNames = new HashSet<>();
+      for (HdfsPartition partition: partitionMap_.values()) {
+        // Remove partitions that don't exist in the Hive Metastore. These are partitions
+        // that were removed from HMS using some external process, e.g. Hive.
+        if (isRemoved(partition)) {
+          removedPartitions.add(partition);
+        } else {
+          HdfsPartition.Builder updatedPartBuilder = getUpdatedPartition(partition);
+          if (updatedPartBuilder != null) {
+            // If there are any self-updated (dirty) or externally updated partitions
+            // add them to the list of updatedPartitions so that they are reloaded later.
+            updatedPartitions.put(partition.getPartitionName(), updatedPartBuilder);
+          } else if (loadFileMd_ && partitionsToUpdate_ == null) {
+            partitionsToLoadFiles.add(new HdfsPartition.Builder(partition));
+          }
+        }
+        Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor());
+        partitionNames.add(partition.getPartitionName());
       }
-      Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor());
-      partitionNames.add(partition.getPartitionName());
-    }
-    dropPartitions(removedPartitions);
-    // Load dirty partitions from Hive Metastore. File metadata of dirty partitions will
-    // always be reloaded (ignore the loadPartitionFileMetadata flag).
-    fileMdLoadTime += loadPartitionsFromMetastore(dirtyPartitions, client);
-    Preconditions.checkState(!hasInProgressModification());
-
-    // Identify and load partitions that were added in the Hive Metastore but don't
-    // exist in this table. File metadata of them will be loaded.
-    Set<String> newPartitionsInHms = Sets.difference(msPartitionNames, partitionNames);
-    fileMdLoadTime += loadPartitionsFromMetastore(newPartitionsInHms,
-        /*inprogressPartBuilders=*/null, client);
-    // If a list of modified partitions (old and new) is specified, don't reload file
-    // metadata for the new ones as they have already been detected in HMS and have been
-    // reloaded by loadPartitionsFromMetastore().
-    if (partitionsToUpdate != null) {
-      partitionsToUpdate.removeAll(newPartitionsInHms);
-    }
-
-    // Load file metadata. Until we have a notification mechanism for when a
-    // file changes in hdfs, it is sometimes required to reload all the file
-    // descriptors and block metadata of a table (e.g. REFRESH statement).
-    if (loadPartitionFileMetadata) {
+      dropPartitions(removedPartitions);
+      // Load dirty partitions from Hive Metastore. File metadata of dirty partitions will
+      // always be reloaded (ignore the loadPartitionFileMetadata flag).
+      loadTimeForFileMdNs_ = loadUpdatedPartitions(updatedPartitions);
+      Preconditions.checkState(!hasInProgressModification());
+      Set<String> addedPartitions = new HashSet<>();
+      loadTimeForFileMdNs_ += loadNewPartitions(partitionNames, addedPartitions);
+      // If a list of modified partitions (old and new) is specified, don't reload file
+      // metadata for the new ones as they have already been detected in HMS and have been
+      // reloaded by loadNewPartitions().
+      if (partitionsToUpdate_ != null) {
+        partitionsToUpdate_.removeAll(addedPartitions);
+      }
+      // Load file metadata. Until we have a notification mechanism for when a
+      // file changes in hdfs, it is sometimes required to reload all the file
+      // descriptors and block metadata of a table (e.g. REFRESH statement).
+      if (loadFileMd_) {
+        if (partitionsToUpdate_ != null) {
+          Preconditions.checkState(partitionsToLoadFiles.isEmpty());
+          // Only reload file metadata of partitions specified in 'partitionsToUpdate'
+          List<HdfsPartition> parts = getPartitionsForNames(partitionsToUpdate_);
+          partitionsToLoadFiles = parts.stream().map(HdfsPartition.Builder::new)
+              .collect(Collectors.toList());
+        }
+        loadTimeForFileMdNs_ += loadFileMetadataForPartitions(client_,
+            partitionsToLoadFiles,/* isRefresh=*/true);
+        updatePartitions(partitionsToLoadFiles);
+      }
+    }
+
+    /**
+     * Returns the total time taken to load file-metadata in nanoseconds. Mostly used
+     * for legacy reasons to return to the coordinators the time taken load file-metadata.
+     */
+    public long getTotalFileMdLoadTime() {
+      return loadTimeForFileMdNs_;
+    }
+  }
+
+  /**
+   * Util class which computes the delta of partitions for this table when compared to
+   * HMS. This class fetches all the partition objects from metastore and then evaluates
+   * the delta with what is known to this HdfsTable. It also detects changed partitions
+   * unlike {@link PartNameBasedDeltaUpdater} which only determines change in list
+   * of partition names.
+   */
+  private class PartBasedDeltaUpdater extends PartitionDeltaUpdater {
+    private final Map<String, Partition> msPartitions_ = new HashMap<>();
+    private final FsPermissionCache permCache_ = new FsPermissionCache();
+
+    public PartBasedDeltaUpdater(
+        IMetaStoreClient client, boolean loadPartitionFileMetadata,
+        Set<String> partitionsToUpdate) throws Exception {
+      super(client, loadPartitionFileMetadata, partitionsToUpdate);
+      Stopwatch sw = Stopwatch.createStarted();
+      List<Partition> partitionList;
       if (partitionsToUpdate != null) {
-        Preconditions.checkState(partitionsToLoadFiles.isEmpty());
-        // Only reload file metadata of partitions specified in 'partitionsToUpdate'
-        List<HdfsPartition> parts = getPartitionsForNames(partitionsToUpdate);
-        partitionsToLoadFiles = parts.stream().map(HdfsPartition.Builder::new)
-            .collect(Collectors.toList());
+        partitionList = MetaStoreUtil
+            .fetchPartitionsByName(client, Lists.newArrayList(partitionsToUpdate),
+                db_.getName(), name_);
+      } else {
+        partitionList =
+            MetaStoreUtil.fetchAllPartitions(
+                client_, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
+      }
+      LOG.debug("Time taken to fetch all partitions of table {}: {} msec", getFullName(),
+          sw.stop().elapsed(TimeUnit.MILLISECONDS));
+      List<String> partitionColNames = getClusteringColNames();
+      for (Partition part : partitionList) {
+        msPartitions_
+            .put(MetastoreShim.makePartName(partitionColNames, part.getValues()), part);
       }
-      fileMdLoadTime += loadFileMetadataForPartitions(client, partitionsToLoadFiles,
-          /* isRefresh=*/true);
-      updatePartitions(partitionsToLoadFiles);
     }
-    return fileMdLoadTime;
+
+    @Override
+    public boolean isRemoved(HdfsPartition hdfsPartition) {
+      return !msPartitions_.containsKey(hdfsPartition.getPartitionName());
+    }
+
+    /**
+     * In addition to the dirty partitions (representing partitions which are updated
+     * via on-going table metadata changes in this Catalog), this also detects staleness
+     * by comparing the {@link StorageDescriptor} of the given HdfsPartition with what is
+     * present in the HiveMetastore. This is useful to perform "deep" refresh table so
+     * that outside changes to existing partitions (eg. location update) are detected.
+     */
+    @Override
+    public HdfsPartition.Builder getUpdatedPartition(HdfsPartition hdfsPartition)
+        throws Exception {
+      HdfsPartition.Builder updatedPartitionBuilder = pickInprogressPartitionBuilder(
+          hdfsPartition);
+      Partition msPartition = Preconditions
+          .checkNotNull(msPartitions_.get(hdfsPartition.getPartitionName()));
+      Preconditions.checkNotNull(msPartition.getSd());
+      // we compare the StorageDescriptor from HdfsPartition object to the one
+      // from HMS and if they don't match we assume that the partition has been updated
+      // in HMS. This would catch the cases where partition fields, locations or
+      // file-format are changed from external systems.
+      StorageDescriptor sd = hdfsPartition.getStorageDescriptor();
+      if(!msPartition.getSd().equals(sd)) {
+        // if the updatePartitionBuilder is null, it means that this partition update
+        // was not from an in-progress modification in this catalog, but rather from
+        // and outside update to the partition.
+        if (updatedPartitionBuilder == null) {
+          updatedPartitionBuilder = new HdfsPartition.Builder(hdfsPartition);
+        }
+        // msPartition is different than what we have in HdfsTable
+        updatedPartitionBuilder = createOrUpdatePartitionBuilder(msPartition.getSd(),
+            msPartition, permCache_, updatedPartitionBuilder);
+      }
+      return updatedPartitionBuilder;
+    }
+
+    @Override
+    public long loadNewPartitions(Set<String> knownPartitions, Set<String> addedPartNames)
+        throws Exception {
+      // get the names of the partitions which present in HMS but not in this table.
+      List<Partition> newMsPartitions = new ArrayList<>();
+      for (String partNameInMs : msPartitions_.keySet()) {
+        if (!knownPartitions.contains(partNameInMs)) {
+          newMsPartitions.add(msPartitions_.get(partNameInMs));
+          addedPartNames.add(partNameInMs);
+        }
+      }
+      return loadPartitionsFromMetastore(newMsPartitions,
+          /*inprogressPartBuilders=*/null, client_);
+    }
+
+    @Override
+    public long loadUpdatedPartitions(
+        Map<String, HdfsPartition.Builder> updatedPartBuilders) throws Exception {
+      List<Partition> updatedPartitions = new ArrayList<>();
+      for (String partName : updatedPartBuilders.keySet()) {
+        updatedPartitions.add(Preconditions
+            .checkNotNull(msPartitions_.get(partName)));
+      }
+      return loadPartitionsFromMetastore(updatedPartitions, updatedPartBuilders, client_);
+    }
+  }
+
+
+  /**
+   * This DeltaChecker uses partition names to determine the delta between metastore
+   * and catalog. As such this is faster than {@link PartBasedDeltaUpdater} but it cannot
+   * detect partition updates other than partition names (e.g. outside partition location
+   * updates will not be detected).
+   */
+  private class PartNameBasedDeltaUpdater extends PartitionDeltaUpdater {
+    private final Set<String> partitionNamesFromHms_;
+
+    public PartNameBasedDeltaUpdater(
+        IMetaStoreClient client, boolean loadPartitionFileMetadata,
+        Set<String> partitionsToUpdate) throws Exception {
+      super(client, loadPartitionFileMetadata, partitionsToUpdate);
+      // Retrieve all the partition names from the Hive Metastore. We need this to
+      // identify the delta between partitions of the local HdfsTable and the table entry
+      // in the Hive Metastore. Note: This is a relatively "cheap" operation
+      // (~.3 secs for 30K partitions).
+      partitionNamesFromHms_ = new HashSet<>(client_
+          .listPartitionNames(db_.getName(), name_, (short) -1));
+    }
+
+    @Override
+    public boolean isRemoved(HdfsPartition hdfsPartition) {
+      return !partitionNamesFromHms_.contains(hdfsPartition.getPartitionName());
+    }
+
+    @Override
+    public HdfsPartition.Builder getUpdatedPartition(HdfsPartition hdfsPartition) {
+      return pickInprogressPartitionBuilder(hdfsPartition);
+    }
+
+    @Override
+    public long loadNewPartitions(Set<String> knownPartitionNames,
+        Set<String> addedPartNames) throws Exception {
+      // Identify and load partitions that were added in the Hive Metastore but don't
+      // exist in this table. File metadata of them will be loaded.
+      addedPartNames.addAll(Sets
+          .difference(partitionNamesFromHms_, knownPartitionNames));
+      return loadPartitionsFromMetastore(addedPartNames,
+          /*inprogressPartBuilders=*/null, client_);
+    }
+
+    @Override
+    public long loadUpdatedPartitions(
+        Map<String, HdfsPartition.Builder> updatedPartitionBuilders) throws Exception {
+      return loadPartitionsFromMetastore(updatedPartitionBuilders.keySet(),
+          updatedPartitionBuilders, client_);
+    }
+  }
+
+  /**
+   * Gets the names of partition columns.
+   */
+  public List<String> getClusteringColNames() {
+    List<String> colNames = new ArrayList<>(getNumClusteringCols());
+    for (Column column : getClusteringColumns()) {
+      colNames.add(column.name_);
+    }
+    return colNames;
   }
 
   /**
@@ -1443,26 +1685,6 @@ public class HdfsTable extends Table implements FeFsTable {
   }
 
   /**
-   * Loads partitions from the Hive Metastore and adds them to the internal list of
-   * table partitions.
-   * @return time in nanoseconds spent in loading file metadata.
-   */
-  private long loadPartitionsFromMetastore(List<HdfsPartition.Builder> partitions,
-      IMetaStoreClient client) throws Exception {
-    Preconditions.checkNotNull(partitions);
-    if (partitions.isEmpty()) return 0;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(String.format("Incrementally updating %d/%d partitions.",
-          partitions.size(), partitionMap_.size()));
-    }
-    Map<String, HdfsPartition.Builder> partBuilders = Maps.newHashMap();
-    for (HdfsPartition.Builder part: partitions) {
-      partBuilders.put(part.getPartitionName(), part);
-    }
-    return loadPartitionsFromMetastore(partBuilders.keySet(), partBuilders, client);
-  }
-
-  /**
    * Loads from the Hive Metastore and file system the partitions that correspond to
    * the specified 'partitionNames' and adds/updates them to the internal list of table
    * partitions.
@@ -1480,7 +1702,12 @@ public class HdfsTable extends Table implements FeFsTable {
     List<Partition> msPartitions = new ArrayList<>(
         MetaStoreUtil.fetchPartitionsByName(
             client, Lists.newArrayList(partitionNames), db_.getName(), name_));
+    return loadPartitionsFromMetastore(msPartitions, inprogressPartBuilders, client);
+  }
 
+  private long loadPartitionsFromMetastore(List<Partition> msPartitions,
+      Map<String, HdfsPartition.Builder> inprogressPartBuilders, IMetaStoreClient client)
+      throws Exception {
     FsPermissionCache permCache = preloadPermissionsCache(msPartitions);
     List<HdfsPartition.Builder> partBuilders = new ArrayList<>(msPartitions.size());
     for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 58614ec..d893d61 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -939,7 +939,7 @@ public class CatalogOpExecutor {
           getMetaStoreTable(msClient, tbl);
       if (tbl instanceof HdfsTable) {
         ((HdfsTable) tbl).load(true, msClient.getHiveClient(), msTbl,
-            reloadFileMetadata, reloadTableSchema, partitionsToUpdate, reason);
+            reloadFileMetadata, reloadTableSchema, false, partitionsToUpdate, reason);
       } else {
         tbl.load(true, msClient.getHiveClient(), msTbl, reason);
       }
@@ -4207,7 +4207,8 @@ public class CatalogOpExecutor {
                 //     ACID tables, there is a Jira to cover this: HIVE-22062.
                 //   2: If no need for a full table reload then fetch partition level
                 //     writeIds and reload only the ones that changed.
-                updatedThriftTable = catalog_.reloadTable(tbl, cmdString);
+                updatedThriftTable = catalog_
+                    .reloadTable(tbl, req.refresh_updated_hms_partitions, cmdString);
               }
             } else {
               // Table was loaded from scratch, so it's already "refreshed".
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index eb3bc69..4e1d45e 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -386,8 +386,8 @@ public class Frontend {
    * Constructs a TCatalogOpRequest and attaches it, plus any metadata, to the
    * result argument.
    */
-  private void createCatalogOpRequest(AnalysisResult analysis,
-      TExecRequest result) throws InternalException {
+  private void createCatalogOpRequest(AnalysisResult analysis, TExecRequest result)
+      throws InternalException {
     TCatalogOpRequest ddl = new TCatalogOpRequest();
     TResultSetMetadata metadata = new TResultSetMetadata();
     if (analysis.isUseStmt()) {
@@ -667,6 +667,8 @@ public class Frontend {
     }
     if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) {
       ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl());
+      ddl.getReset_metadata_params().setRefresh_updated_hms_partitions(
+          result.getQuery_options().isRefresh_updated_hms_partitions());
     }
   }
 
diff --git a/tests/metadata/test_reset_metadata.py b/tests/metadata/test_reset_metadata.py
index 07ee7f1..59eda1c 100644
--- a/tests/metadata/test_reset_metadata.py
+++ b/tests/metadata/test_reset_metadata.py
@@ -32,3 +32,86 @@ class TestResetMetadata(TestDdlBase):
 
     self.client.execute('refresh functions %s' % unique_database)
     self.client.execute('refresh functions %s' % unique_database.upper())
+
+  def test_refresh_updated_partitions(self, unique_database):
+    """
+    Test to exercise and confirm the query option REFRESH_UPDATED_HMS_PARTITIONS
+    works as expected (IMPALA-4364).
+    """
+    tbl = unique_database + "." + "test"
+    self.client.execute(
+      "create table {0} (c1 int) partitioned by (year int, month int)".format(tbl))
+    # create 3 partitions and load data in them.
+    self.client.execute("insert into table {0} partition (year, month)"
+      "values (100, 2009, 1), (200, 2009, 2), (300, 2009, 3)".format(tbl))
+    # add a new partition from hive
+    self.run_stmt_in_hive(
+      "alter table {0} add partition (year=2020, month=8)".format(tbl))
+    self.client.execute("refresh {0}".format(tbl))
+
+    # case 1: update the partition location
+    self.run_stmt_in_hive(
+      "alter table {0} partition (year=2020, month=8) "
+      "set location 'hdfs:///tmp/year=2020/month=8'".format(tbl))
+    # first try refresh without setting the query option
+    self.execute_query("refresh {0}".format(tbl))
+    result = self.execute_query("show partitions {0}".format(tbl))
+    assert "/tmp/year=2020/month=8" not in result.get_data()
+    self.execute_query("refresh {0}".format(tbl),
+      query_options={"REFRESH_UPDATED_HMS_PARTITIONS": 0})
+    result = self.execute_query("show partitions {0}".format(tbl))
+    assert "/tmp/year=2020/month=8" not in result.get_data()
+    self.execute_query("refresh {0}".format(tbl),
+      query_options={"REFRESH_UPDATED_HMS_PARTITIONS": "False"})
+    result = self.execute_query("show partitions {0}".format(tbl))
+    assert "/tmp/year=2020/month=8" not in result.get_data()
+
+    # now issue a refresh with the query option set
+    self.execute_query("refresh {0}".format(tbl),
+      query_options={"REFRESH_UPDATED_HMS_PARTITIONS": 1})
+    result = self.execute_query("show partitions {0}".format(tbl))
+    assert "/tmp/year=2020/month=8" in result.get_data()
+
+    # change the location back to original and test using the query option
+    # set as true
+    new_loc = "/test-warehouse/{0}.db/{1}/year=2020/month=8".format(
+      unique_database, "test")
+    self.run_stmt_in_hive("alter table {0} partition (year=2020, month=8) "
+      "set location 'hdfs://{1}'".format(tbl, new_loc))
+    self.execute_query("refresh {0}".format(tbl),
+      query_options={"REFRESH_UPDATED_HMS_PARTITIONS": "true"})
+    result = self.execute_query("show partitions {0}".format(tbl))
+    assert new_loc in result.get_data()
+
+    # case2: change the partition to a different file-format, note that the table's
+    # file-format is text.
+    # add another test partition. It should use the default file-format from the table.
+    self.execute_query("alter table {0} add partition (year=2020, month=9)".format(tbl))
+    # change the partition file-format from hive
+    self.run_stmt_in_hive("alter table {0} partition (year=2020, month=9) "
+                          "set fileformat parquet".format(tbl))
+    # make sure that refresh without the query option does not update the partition
+    self.execute_query("refresh {0}".format(tbl))
+    self.execute_query("insert into {0} partition (year=2020, month=9) "
+                       "select c1 from {0} where year=2009 and month=1".format(tbl))
+    result = self.execute_query(
+      "show files in {0} partition (year=2020, month=8)".format(tbl))
+    assert ".parq" not in result.get_data()
+    # change the file-format for another partition from hive
+    self.run_stmt_in_hive("alter table {0} partition (year=2020, month=8) "
+    "set fileformat parquet".format(tbl))
+    # now try refresh with the query option set
+    self.execute_query("refresh {0}".format(tbl),
+      query_options={"REFRESH_UPDATED_HMS_PARTITIONS": 1})
+    self.execute_query("insert into {0} partition (year=2020, month=8) "
+      "select c1 from {0} where year=2009 and month=1".format(tbl))
+    # make sure the partition year=2020/month=8 is parquet fileformat
+    result = self.execute_query(
+      "show files in {0} partition (year=2020, month=8)".format(tbl))
+    assert ".parq" in result.get_data()
+    # make sure that the other partitions are still in text format
+    self.execute_query("insert into {0} partition (year=2020, month=1) "
+      "select c1 from {0} where year=2009 and month=1".format(tbl))
+    result = self.execute_query(
+      "show files in {0} partition (year=2020, month=1)".format(tbl))
+    assert ".txt" in result.get_data()