You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/06/16 15:33:43 UTC

[impala] branch master updated (13fbe51 -> ee70df2)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 13fbe51  IMPALA-9838: Switch to GCC 7.5.0
     new 419aa2e  IMPALA-9778: Refactor partition modifications in DDL/DMLs
     new ee70df2  IMPALA-9858: Fix wrong partition metrics in LocalCatalog profile

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../impala/catalog/CatalogServiceCatalog.java      |   6 +-
 .../org/apache/impala/catalog/FeCatalogUtils.java  |  28 +-
 .../org/apache/impala/catalog/HdfsPartition.java   | 615 +++++++++++++--------
 .../java/org/apache/impala/catalog/HdfsTable.java  | 384 ++++++++-----
 .../impala/catalog/ParallelFileMetadataLoader.java |  23 +-
 .../apache/impala/catalog/PartitionStatsUtil.java  |   2 +-
 .../main/java/org/apache/impala/catalog/Table.java |  15 +
 .../impala/catalog/local/CatalogdMetaProvider.java |   4 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 202 +++----
 .../org/apache/impala/util/HdfsCachingUtil.java    |  21 +-
 .../catalog/CatalogObjectToFromThriftTest.java     |  10 +-
 .../org/apache/impala/catalog/CatalogTest.java     |   7 +-
 .../catalog/local/CatalogdMetaProviderTest.java    |  61 +-
 13 files changed, 853 insertions(+), 525 deletions(-)


[impala] 02/02: IMPALA-9858: Fix wrong partition metrics in LocalCatalog profile

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ee70df2e9006d3592175af5f1fa7ec128f5f1b8d
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Mon Jun 15 15:07:19 2020 +0800

    IMPALA-9858: Fix wrong partition metrics in LocalCatalog profile
    
    The hits and requests metrics of partitions are overcounted due to using
    an updated map. This patch fixes it and adds test coverage on partition
    metrics.
    
    Tests
     - Run CatalogdMetaProviderTest
    
    Change-Id: I10cabce2908f1d252b90390978e679d31003e89d
    Reviewed-on: http://gerrit.cloudera.org:8080/16080
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/local/CatalogdMetaProvider.java |  4 +-
 .../catalog/local/CatalogdMetaProviderTest.java    | 61 +++++++++++++++-------
 2 files changed, 43 insertions(+), 22 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 03aba1e..02195ad 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -895,8 +895,8 @@ public class CatalogdMetaProvider implements MetaProvider {
       storePartitionsInCache(refImpl, hostIndex, fromCatalogd);
     }
     sw.stop();
-    addStatsToProfile(PARTITIONS_STATS_CATEGORY, refToMeta.size(), numMisses, sw);
-    LOG.trace("Request for partitions of {}: hit {}/{}", table, refToMeta.size(),
+    addStatsToProfile(PARTITIONS_STATS_CATEGORY, numHits, numMisses, sw);
+    LOG.trace("Request for partitions of {}: hit {}/{}", table, numHits,
         partitionRefs.size());
 
     // Convert the returned map to be by-name instead of by-ref.
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
index c4dcf2d..b378ce0 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -20,7 +20,6 @@ package org.apache.impala.catalog.local;
 import static org.junit.Assert.*;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -44,6 +43,7 @@ import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TBriefTableMeta;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TCounter;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TRuntimeProfileNode;
@@ -58,11 +58,13 @@ import com.google.common.base.Stopwatch;
 import com.google.common.cache.CacheStats;
 import com.google.common.collect.ImmutableCollection;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
 
 public class CatalogdMetaProviderTest {
 
   private final static Logger LOG = LoggerFactory.getLogger(
       CatalogdMetaProviderTest.class);
+  private final static ListMap<TNetworkAddress> HOST_INDEX = new ListMap<>();
 
   private final CatalogdMetaProvider provider_;
   private final TableMetaRef tableRef_;
@@ -113,33 +115,36 @@ public class CatalogdMetaProviderTest {
   public void testCachePartitionsByRef() throws Exception {
     List<PartitionRef> allRefs = provider_.loadPartitionList(tableRef_);
     List<PartitionRef> partialRefs = allRefs.subList(3, 8);
-    ListMap<TNetworkAddress> hostIndex = new ListMap<>();
     CacheStats stats = diffStats();
 
     // Should get no hits on the initial load of partitions.
-    Map<String, PartitionMetadata> partMap = provider_.loadPartitionsByRefs(
-        tableRef_, /* partitionColumnNames unused by this impl */null, hostIndex,
-        partialRefs);
+    Map<String, PartitionMetadata> partMap = loadPartitions(tableRef_, partialRefs);
     assertEquals(partialRefs.size(), partMap.size());
     stats = diffStats();
     assertEquals(0, stats.hitCount());
 
     // Load the same partitions again and we should get a hit for each partition.
-    Map<String, PartitionMetadata> partMapHit = provider_.loadPartitionsByRefs(
-        tableRef_, /* partitionColumnNames unused by this impl */null, hostIndex,
-        partialRefs);
+    Map<String, PartitionMetadata> partMapHit = loadPartitions(tableRef_, partialRefs);
     stats = diffStats();
     assertEquals(stats.hitCount(), partMapHit.size());
 
     // Load all of the partitions: we should get some hits and some misses.
-    Map<String, PartitionMetadata> allParts = provider_.loadPartitionsByRefs(
-        tableRef_, /* partitionColumnNames unused by this impl */null, hostIndex,
-        allRefs);
+    Map<String, PartitionMetadata> allParts = loadPartitions(tableRef_, allRefs);
     assertEquals(allRefs.size(), allParts.size());
     stats = diffStats();
     assertEquals(stats.hitCount(), partMapHit.size());
   }
 
+  /**
+   * Helper method for loading partitions by refs.
+   */
+  private Map<String, PartitionMetadata> loadPartitions(TableMetaRef tableMetaRef,
+     List<PartitionRef> partRefs) throws Exception {
+    return provider_.loadPartitionsByRefs(
+        tableMetaRef, /* partitionColumnNames unused by this impl */null, HOST_INDEX,
+        partRefs);
+  }
+
   @Test
   public void testCacheColumnStats() throws Exception {
     ImmutableList<String> colNames = ImmutableList.of("month", "id");
@@ -253,19 +258,35 @@ public class CatalogdMetaProviderTest {
   public void testProfile() throws Exception {
     FrontendProfile profile;
     try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) {
+      // This table has been loaded in the constructor. Hit cache.
       provider_.loadTable("functional", "alltypes");
+      // Load all partition ids. This will create a PartitionLists miss.
+      List<PartitionRef> allRefs = provider_.loadPartitionList(tableRef_);
+      // Load all partitions. This will create one partition miss per partition.
+      loadPartitions(tableRef_, allRefs);
       profile = FrontendProfile.getCurrent();
     }
     TRuntimeProfileNode prof = profile.emitAsThrift();
-    assertEquals(4, prof.counters.size());
-    Collections.sort(prof.counters);
-    assertEquals("TCounter(name:CatalogFetch.Tables.Hits, unit:NONE, value:1)",
-        prof.counters.get(0).toString());
-    assertEquals("TCounter(name:CatalogFetch.Tables.Misses, unit:NONE, value:0)",
-        prof.counters.get(1).toString());
-    assertEquals("TCounter(name:CatalogFetch.Tables.Requests, unit:NONE, value:1)",
-        prof.counters.get(2).toString());
-    assertEquals("CatalogFetch.Tables.Time", prof.counters.get(3).name);
+    Map<String, TCounter> counters = Maps.uniqueIndex(prof.counters, TCounter::getName);
+    assertEquals(prof.counters.toString(), 16, counters.size());
+    assertEquals(1, counters.get("CatalogFetch.Tables.Hits").getValue());
+    assertEquals(0, counters.get("CatalogFetch.Tables.Misses").getValue());
+    assertEquals(1, counters.get("CatalogFetch.Tables.Requests").getValue());
+    assertTrue(counters.containsKey("CatalogFetch.Tables.Time"));
+    assertEquals(0, counters.get("CatalogFetch.PartitionLists.Hits").getValue());
+    assertEquals(1, counters.get("CatalogFetch.PartitionLists.Misses").getValue());
+    assertEquals(1, counters.get("CatalogFetch.PartitionLists.Requests").getValue());
+    assertTrue(counters.containsKey("CatalogFetch.PartitionLists.Time"));
+    assertEquals(0, counters.get("CatalogFetch.Partitions.Hits").getValue());
+    assertEquals(24, counters.get("CatalogFetch.Partitions.Misses").getValue());
+    assertEquals(24, counters.get("CatalogFetch.Partitions.Requests").getValue());
+    assertTrue(counters.containsKey("CatalogFetch.Partitions.Time"));
+    assertTrue(counters.containsKey("CatalogFetch.RPCs.Bytes"));
+    assertTrue(counters.containsKey("CatalogFetch.RPCs.Time"));
+    // 2 RPCs: one for fetching partition list, the other one for fetching partitions.
+    assertEquals(2, counters.get("CatalogFetch.RPCs.Requests").getValue());
+    // Should contains StorageLoad.Time since we have loaded partitions from catalogd.
+    assertTrue(counters.containsKey("CatalogFetch.StorageLoad.Time"));
   }
 
   @Test


[impala] 01/02: IMPALA-9778: Refactor partition modifications in DDL/DMLs

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 419aa2e30db326f02e9b4ec563ef7864e82df86e
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Mon May 25 18:01:38 2020 +0800

    IMPALA-9778: Refactor partition modifications in DDL/DMLs
    
    After this patch, in DDL/DMLs that update metadata of partitions,
    instead of updating partitions in place, we always create new ones and
    use them to replace the existing instances. This is guarded by making
    HdfsPartition immutable. There are several benefits for this:
     - HdfsPartition can be shared across table versions. In full catalog
       update mode, catalog update can ignore unchanged partitions
       (IMPALA-3234) and send the update in partition granularity.
     - Aborted DDL/DMLs won't leave partition metadata in a bad shape (e.g.
       IMPALA-8406), which usually requires invalidation to recover.
     - Fetch-on-demand coordinators can cache partition meta using the
       partition id as the key. When table version updates, only metadata of
       changed partitions need to be reloaded (IMPALA-7533).
     - In the work of decoupling partitions from tables (IMPALA-3127), we
       don't need to assign a catalog version to partitions since the
       partition ids already identify the partitions.
    
    However, HdfsPartition is not strictly immutable. Although all its
    fields are final, some fields are still referencing mutable objects. We
    need more refactoring to achieve this. This patch focuses on refactoring
    the DDL/DML code paths.
    
    Changes:
     - Make all fields of HdfsPartition final. Move
       HdfsPartition constructor logics and all its update methods into
       HdfsPartition.Builder.
     - Refactor in-place updates on HdfsPartition to be creating a new one
       and dropping the old one. HdfsPartition.Builder represents the
       in-progress modifications. Once all modifications are done, call its
       build() method to create the new HdfsPartition instance. The old
       HdfsPartition instance is only replaced at the end of the
       modifications.
     - Move the "dirty" marker of HdfsPartition into a map of HdfsTable. It
       maps from the old partition id to the in-progress partition builder.
       For "dirty" partitions, we’ll reload its HMS meta and file meta.
    
    Tests:
     - No new tests are added since the existing tests already provide
       sufficient coverage
     - Run CORE tests
    
    Change-Id: Ib52e5810d01d5e0c910daacb9c98977426d3914c
    Reviewed-on: http://gerrit.cloudera.org:8080/15985
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/CatalogServiceCatalog.java      |   6 +-
 .../org/apache/impala/catalog/FeCatalogUtils.java  |  28 +-
 .../org/apache/impala/catalog/HdfsPartition.java   | 615 +++++++++++++--------
 .../java/org/apache/impala/catalog/HdfsTable.java  | 384 ++++++++-----
 .../impala/catalog/ParallelFileMetadataLoader.java |  23 +-
 .../apache/impala/catalog/PartitionStatsUtil.java  |   2 +-
 .../main/java/org/apache/impala/catalog/Table.java |  15 +
 .../apache/impala/service/CatalogOpExecutor.java   | 202 +++----
 .../org/apache/impala/util/HdfsCachingUtil.java    |  21 +-
 .../catalog/CatalogObjectToFromThriftTest.java     |  10 +-
 .../org/apache/impala/catalog/CatalogTest.java     |   7 +-
 11 files changed, 810 insertions(+), 503 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 994abff..fafad2c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 import org.apache.commons.collections.MapUtils;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -3125,8 +3126,11 @@ public class CatalogServiceCatalog extends Catalog {
             "Unable to fetch valid transaction ids while loading file metadata for table "
                 + table.getFullName(), ex);
       }
+      List<HdfsPartition.Builder> partBuilders = partToPartialInfoMap.keySet().stream()
+          .map(HdfsPartition.Builder::new)
+          .collect(Collectors.toList());
       Map<HdfsPartition, List<FileDescriptor>> fdsByPart = new ParallelFileMetadataLoader(
-          table, partToPartialInfoMap.keySet(), reqWriteIdList, validTxnList, logPrefix)
+          table, partBuilders, reqWriteIdList, validTxnList, logPrefix)
           .loadAndGet();
       for (HdfsPartition partition : fdsByPart.keySet()) {
         TPartialPartitionInfo partitionInfo = partToPartialInfoMap.get(partition);
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index f88beb6..ccaa238 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -264,14 +265,8 @@ public abstract class FeCatalogUtils {
    * TODO: this could be a default method in FeFsPartition in Java 8.
    */
   public static String getPartitionName(FeFsPartition partition) {
-    FeFsTable table = partition.getTable();
-    List<String> partitionCols = new ArrayList<>();
-    for (int i = 0; i < table.getNumClusteringCols(); ++i) {
-      partitionCols.add(table.getColumns().get(i).getName());
-    }
-
-    return FileUtils.makePartName(
-        partitionCols, getPartitionValuesAsStrings(partition, true));
+    return getPartitionName(partition.getTable(),
+        getPartitionValuesAsStrings(partition, true));
   }
 
   // TODO: this could be a default method in FeFsPartition in Java 8.
@@ -289,6 +284,23 @@ public abstract class FeCatalogUtils {
     return ret;
   }
 
+  public static String getPartitionName(HdfsPartition.Builder partBuilder) {
+    HdfsTable table = partBuilder.getTable();
+    List<String> partitionValues = new ArrayList<>();
+    for (LiteralExpr partValue : partBuilder.getPartitionValues()) {
+      partitionValues.add(PartitionKeyValue.getPartitionKeyValueString(
+          partValue, table.getNullPartitionKeyValue()));
+    }
+    return getPartitionName(table, partitionValues);
+  }
+
+  public static String getPartitionName(FeFsTable table, List<String> partitionValues) {
+    List<String> partitionKeys = table.getClusteringColumns().stream()
+        .map(Column::getName)
+        .collect(Collectors.toList());
+    return FileUtils.makePartName(partitionKeys, partitionValues);
+  }
+
   // TODO: this could be a default method in FeFsPartition in Java 8.
   public static String getConjunctSqlForPartition(FeFsPartition part) {
     List<String> partColSql = new ArrayList<>();
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index e7a5bb8..1cb5c30 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,6 +31,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -80,6 +82,8 @@ import com.google.flatbuffers.FlatBufferBuilder;
  * based on their partition-key values. The comparison orders partitions in ascending
  * order with NULLs sorting last. The ordering is useful for displaying partitions
  * in SHOW statements.
+ * This class is supposed to be immutable. We should use HdfsPartition.Builder to create
+ * new instances instead of updating the fields in-place.
  */
 public class HdfsPartition implements FeFsPartition, PrunablePartition {
   /**
@@ -514,9 +518,9 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     public final boolean sdCompressed;
     public final int sdNumBuckets;
     public final org.apache.hadoop.hive.metastore.api.SerDeInfo sdSerdeInfo;
-    public final List<String> sdBucketCols;
-    public final List<org.apache.hadoop.hive.metastore.api.Order> sdSortCols;
-    public final Map<String, String> sdParameters;
+    public final ImmutableList<String> sdBucketCols;
+    public final ImmutableList<org.apache.hadoop.hive.metastore.api.Order> sdSortCols;
+    public final ImmutableMap<String, String> sdParameters;
     public final int msCreateTime;
     public final int msLastAccessTime;
 
@@ -552,6 +556,19 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
         sdParameters = ImmutableMap.of();
       }
     }
+
+    private CachedHmsPartitionDescriptor(CachedHmsPartitionDescriptor other) {
+      sdInputFormat = other.sdInputFormat;
+      sdOutputFormat = other.sdOutputFormat;
+      sdCompressed = other.sdCompressed;
+      sdNumBuckets = other.sdNumBuckets;
+      sdSerdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo(other.sdSerdeInfo);
+      sdBucketCols = other.sdBucketCols;
+      sdSortCols = other.sdSortCols;
+      sdParameters = ImmutableMap.copyOf(other.sdParameters);
+      msCreateTime = other.msCreateTime;
+      msLastAccessTime = other.msLastAccessTime;
+    }
   }
 
   private final static Logger LOG = LoggerFactory.getLogger(HdfsPartition.class);
@@ -567,14 +584,14 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
         }
       };
 
+  private final static AtomicLong partitionIdCounter_ = new AtomicLong();
   private final HdfsTable table_;
-  private final List<LiteralExpr> partitionKeyValues_;
+  private final ImmutableList<LiteralExpr> partitionKeyValues_;
   // estimated number of rows in partition; -1: unknown
-  private long numRows_ = -1;
-  private static AtomicLong partitionIdCounter_ = new AtomicLong();
+  private final long numRows_;
 
-  // A unique ID for each partition, used to identify a partition in the thrift
-  // representation of a table.
+  // A unique ID across the whole catalog for each partition, used to identify a partition
+  // in the thrift representation of a table.
   private final long id_;
 
   /*
@@ -583,7 +600,7 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
    * we. We should therefore treat mixing formats inside one partition as user error.
    * It's easy to add per-file metadata to FileDescriptor if this changes.
    */
-  private HdfsStorageDescriptor fileFormatDescriptor_;
+  private final HdfsStorageDescriptor fileFormatDescriptor_;
 
   /**
    * The file descriptors of this partition, encoded as flatbuffers. Storing the raw
@@ -598,76 +615,55 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
    *    - 4-byte padding (objects are word-aligned)
    */
   @Nonnull
-  private ImmutableList<byte[]> encodedFileDescriptors_;
-  private HdfsPartitionLocationCompressor.Location location_;
-  private boolean isDirty_;
+  private final ImmutableList<byte[]> encodedFileDescriptors_;
+  private final HdfsPartitionLocationCompressor.Location location_;
   // True if this partition is marked as cached. Does not necessarily mean the data is
   // cached.
-  private boolean isMarkedCached_;
+  private final boolean isMarkedCached_;
   private final TAccessLevel accessLevel_;
 
   // (k,v) pairs of parameters for this partition, stored in the HMS.
-  private Map<String, String> hmsParameters_;
+  private final ImmutableMap<String, String> hmsParameters_;
+  private final CachedHmsPartitionDescriptor cachedMsPartitionDescriptor_;
 
   // Binary representation of the TPartitionStats for this partition. Populated
-  // when the partition is loaded and updated using setPartitionStatsBytes().
-  private byte[] partitionStats_;
+  // when the partition is being built in Builder#setPartitionStatsBytes().
+  private final byte[] partitionStats_;
 
   // True if partitionStats_ has intermediate_col_stats populated.
-  private boolean hasIncrementalStats_ ;
+  private final boolean hasIncrementalStats_ ;
 
   // The last committed write ID which modified this partition.
   // -1 means writeId_ is irrelevant(not supported).
-  private long writeId_ = -1L;
+  private final long writeId_;
 
-  private final InFlightEvents inFlightEvents_ = new InFlightEvents(20);
+  // The in-flight events of this partition tracked in catalogd. It's still mutable since
+  // it's not used in coordinators.
+  private final InFlightEvents inFlightEvents_;
 
-  private HdfsPartition(HdfsTable table,
-      org.apache.hadoop.hive.metastore.api.Partition msPartition,
-      List<LiteralExpr> partitionKeyValues,
+  private HdfsPartition(HdfsTable table, long id, List<LiteralExpr> partitionKeyValues,
       HdfsStorageDescriptor fileFormatDescriptor,
-      List<HdfsPartition.FileDescriptor> fileDescriptors, long id,
-      HdfsPartitionLocationCompressor.Location location, TAccessLevel accessLevel) {
+      @Nonnull ImmutableList<byte[]> encodedFileDescriptors,
+      HdfsPartitionLocationCompressor.Location location,
+      boolean isMarkedCached, TAccessLevel accessLevel, Map<String, String> hmsParameters,
+      CachedHmsPartitionDescriptor cachedMsPartitionDescriptor,
+      byte[] partitionStats, boolean hasIncrementalStats, long numRows, long writeId,
+      InFlightEvents inFlightEvents) {
     table_ = table;
-    if (msPartition == null) {
-      cachedMsPartitionDescriptor_ = null;
-    } else {
-      cachedMsPartitionDescriptor_ = new CachedHmsPartitionDescriptor(msPartition);
-    }
-    location_ = location;
+    id_ = id;
     partitionKeyValues_ = ImmutableList.copyOf(partitionKeyValues);
-    setFileDescriptors(fileDescriptors);
     fileFormatDescriptor_ = fileFormatDescriptor;
-    id_ = id;
+    encodedFileDescriptors_ = encodedFileDescriptors;
+    location_ = location;
+    isMarkedCached_ = isMarkedCached;
     accessLevel_ = accessLevel;
-    if (msPartition != null && msPartition.getParameters() != null) {
-      isMarkedCached_ = HdfsCachingUtil.getCacheDirectiveId(
-          msPartition.getParameters()) != null;
-      hmsParameters_ = msPartition.getParameters();
-    } else {
-      hmsParameters_ = new HashMap<>();
-    }
-    addInflightVersionsFromParameters();
-    extractAndCompressPartStats();
-    // Intern parameters after removing the incremental stats
-    hmsParameters_ = CatalogInterners.internParameters(hmsParameters_);
-    if (MetastoreShim.getMajorVersion() > 2 && msPartition != null) {
-      writeId_ = MetastoreShim.getWriteIdFromMSPartition(msPartition);
-    }
-  }
-
-  public HdfsPartition(HdfsTable table,
-      org.apache.hadoop.hive.metastore.api.Partition msPartition,
-      List<LiteralExpr> partitionKeyValues,
-      HdfsStorageDescriptor fileFormatDescriptor,
-      List<HdfsPartition.FileDescriptor> fileDescriptors,
-      TAccessLevel accessLevel) {
-    this(table, msPartition, partitionKeyValues, fileFormatDescriptor, fileDescriptors,
-        partitionIdCounter_.getAndIncrement(),
-        table.getPartitionLocationCompressor().new Location(msPartition != null
-                ? msPartition.getSd().getLocation()
-                : table.getLocation()),
-        accessLevel);
+    hmsParameters_ = ImmutableMap.copyOf(hmsParameters);
+    cachedMsPartitionDescriptor_ = cachedMsPartitionDescriptor;
+    partitionStats_ = partitionStats;
+    hasIncrementalStats_ = hasIncrementalStats;
+    numRows_ = numRows;
+    writeId_ = writeId;
+    inFlightEvents_ = inFlightEvents;
   }
 
   @Override // FeFsPartition
@@ -748,40 +744,16 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     return FileSystemUtil.FsType.getFsType(getLocationPath().toUri().getScheme());
   }
 
-  public void setNumRows(long numRows) { numRows_ = numRows; }
   @Override // FeFsPartition
   public long getNumRows() { return numRows_; }
   @Override
   public boolean isMarkedCached() { return isMarkedCached_; }
-  void markCached() { isMarkedCached_ = true; }
-  private CachedHmsPartitionDescriptor cachedMsPartitionDescriptor_;
-
-  /**
-   * Updates the file format of this partition and sets the corresponding input/output
-   * format classes.
-   */
-  public void setFileFormat(HdfsFileFormat fileFormat) {
-    fileFormatDescriptor_ = fileFormatDescriptor_.cloneWithChangedFileFormat(
-        fileFormat);
-    cachedMsPartitionDescriptor_.sdInputFormat = fileFormat.inputFormat();
-    cachedMsPartitionDescriptor_.sdOutputFormat = fileFormat.outputFormat();
-    cachedMsPartitionDescriptor_.sdSerdeInfo.setSerializationLib(
-        fileFormatDescriptor_.getFileFormat().serializationLib());
-  }
 
   @Override // FeFsPartition
   public HdfsFileFormat getFileFormat() {
     return fileFormatDescriptor_.getFileFormat();
   }
 
-  public void setLocation(String place) {
-    location_ = table_.getPartitionLocationCompressor().new Location(place);
-  }
-
-  public org.apache.hadoop.hive.metastore.api.SerDeInfo getSerdeInfo() {
-    return cachedMsPartitionDescriptor_.sdSerdeInfo;
-  }
-
   @Override // FeFsPartition
   public TPartitionStats getPartitionStats() {
     return PartitionStatsUtil.getPartStatsOrWarn(this);
@@ -792,38 +764,6 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     return partitionStats_;
   }
 
-  public void setPartitionStatsBytes(byte[] partitionStats, boolean hasIncrStats) {
-    if (hasIncrStats) Preconditions.checkNotNull(partitionStats);
-    partitionStats_ = partitionStats;
-    hasIncrementalStats_ = hasIncrStats;
-  }
-
-  /**
-   * Helper method that removes the partition stats from hmsParameters_, compresses them
-   * and updates partitionsStats_.
-   */
-  private void extractAndCompressPartStats() {
-    try {
-      // Convert the stats stored in the hmsParams map to a deflate-compressed in-memory
-      // byte array format. After conversion, delete the entries in the hmsParams map
-      // as they are not needed anymore.
-      Reference<Boolean> hasIncrStats = new Reference<Boolean>(false);
-      byte[] partitionStats =
-          PartitionStatsUtil.partStatsBytesFromParameters(hmsParameters_, hasIncrStats);
-      setPartitionStatsBytes(partitionStats, hasIncrStats.getRef());
-    } catch (ImpalaException e) {
-      LOG.warn(String.format("Failed to set partition stats for table %s partition %s",
-          getTable().getFullName(), getPartitionName()), e);
-    } finally {
-      // Delete the incremental stats entries. Cleared even on error conditions so that
-      // we do not persist the corrupt entries in the hmsParameters_ map when it is
-      // flushed to the HMS.
-      Maps.filterKeys(hmsParameters_, IS_INCREMENTAL_STATS_KEY).clear();
-    }
-  }
-
-  public void dropPartitionStats() { setPartitionStatsBytes(null, false); }
-
   @Override // FeFsPartition
   public boolean hasIncrementalStats() { return hasIncrementalStats_; }
 
@@ -841,15 +781,6 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     return hmsParameters_;
   }
 
-  public void putToParameters(String k, String v) {
-    Preconditions.checkArgument(!IS_INCREMENTAL_STATS_KEY.apply(k));
-    hmsParameters_.put(k, v);
-  }
-
-  public void putToParameters(Pair<String, String> kv) {
-    putToParameters(kv.first, kv.second);
-  }
-
   /**
    * Removes a given version from the in-flight events
    * @param isInsertEvent If true, remove eventId from list of eventIds for in-flight
@@ -886,33 +817,6 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     }
   }
 
-  /**
-   * Adds the version from the given Partition parameters. No-op if the parameters does
-   * not contain the <code>MetastoreEventPropertyKey.CATALOG_VERSION</code>. This is
-   * done to detect add partition events from this catalog which are generated when
-   * partitions are added or recovered.
-   */
-  private void addInflightVersionsFromParameters() {
-    Preconditions.checkNotNull(hmsParameters_);
-    Preconditions.checkState(inFlightEvents_.size(false) == 0);
-    // we should not check for table lock being held here since there are certain code
-    // paths which call this method without holding the table lock (eg. getOrLoadTable())
-    if (!hmsParameters_.containsKey(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())) {
-      return;
-    }
-    inFlightEvents_.add(false,
-        Long.parseLong(
-            hmsParameters_.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())));
-  }
-
-  /**
-   * Marks this partition's metadata as "dirty" indicating that changes have been
-   * made and this partition's metadata should not be reused during the next
-   * incremental metadata refresh.
-   */
-  public void markDirty() { isDirty_ = true; }
-  public boolean isDirty() { return isDirty_; }
-
   @Override // FeFsPartition
   public List<LiteralExpr> getPartitionValues() { return partitionKeyValues_; }
   @Override // FeFsPartition
@@ -938,13 +842,6 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     return fileNames;
   }
 
-  public void setFileDescriptors(List<FileDescriptor> descriptors) {
-    // Store an eagerly transformed-and-copied list so that we drop the memory usage
-    // of the flatbuffer wrapper.
-    encodedFileDescriptors_ = ImmutableList.copyOf(Lists.transform(
-        descriptors, FileDescriptor.TO_BYTES));
-  }
-
   @Override // FeFsPartition
   public int getNumFileDescriptors() {
     return encodedFileDescriptors_.size();
@@ -995,12 +892,9 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
 
   public static HdfsPartition prototypePartition(
       HdfsTable table, HdfsStorageDescriptor storageDescriptor) {
-    List<LiteralExpr> emptyExprList = new ArrayList<>();
-    List<FileDescriptor> emptyFileDescriptorList = new ArrayList<>();
-    return new HdfsPartition(table, null, emptyExprList,
-        storageDescriptor, emptyFileDescriptorList,
-        CatalogObjectsConstants.PROTOTYPE_PARTITION_ID, null,
-        TAccessLevel.READ_WRITE);
+    return new Builder(table, CatalogObjectsConstants.PROTOTYPE_PARTITION_ID)
+        .setFileFormatDescriptor(storageDescriptor)
+        .build();
   }
 
   @Override
@@ -1019,87 +913,354 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
       .toString();
   }
 
-  public static HdfsPartition fromThrift(HdfsTable table,
-      long id, THdfsPartition thriftPartition) {
-    HdfsStorageDescriptor storageDesc = HdfsStorageDescriptor.fromThriftPartition(
-        thriftPartition, table.getName());
+  public static class Builder {
+    // For the meaning of these fields, see field comments of HdfsPartition.
+    private HdfsTable table_;
+    private long id_;
+    private List<LiteralExpr> partitionKeyValues_;
+    private HdfsStorageDescriptor fileFormatDescriptor_ = null;
+    private ImmutableList<byte[]> encodedFileDescriptors_;
+    private HdfsPartitionLocationCompressor.Location location_ = null;
+    private boolean isMarkedCached_ = false;
+    private TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE;
+    private Map<String, String> hmsParameters_;
+    private CachedHmsPartitionDescriptor cachedMsPartitionDescriptor_;
+    private byte[] partitionStats_ = null;
+    private boolean hasIncrementalStats_ = false;
+    private long numRows_ = -1;
+    private long writeId_ = -1L;
+    private InFlightEvents inFlightEvents_ = new InFlightEvents(20);
+
+    @Nullable
+    private HdfsPartition oldInstance_ = null;
+
+    public Builder(HdfsTable table, long id) {
+      Preconditions.checkNotNull(table);
+      table_ = table;
+      id_ = id;
+    }
+
+    public Builder(HdfsTable table) {
+      this(table, partitionIdCounter_.getAndIncrement());
+    }
 
-    List<LiteralExpr> literalExpr = new ArrayList<>();
-    if (id != CatalogObjectsConstants.PROTOTYPE_PARTITION_ID) {
-      List<Column> clusterCols = new ArrayList<>();
-      for (int i = 0; i < table.getNumClusteringCols(); ++i) {
-        clusterCols.add(table.getColumns().get(i));
+    public Builder(HdfsPartition partition) {
+      this(partition.table_);
+      oldInstance_ = partition;
+      partitionKeyValues_ = partition.partitionKeyValues_;
+      fileFormatDescriptor_ = partition.fileFormatDescriptor_;
+      encodedFileDescriptors_ = partition.encodedFileDescriptors_;
+      location_ = partition.location_;
+      isMarkedCached_ = partition.isMarkedCached_;
+      accessLevel_ = partition.accessLevel_;
+      hmsParameters_ = Maps.newHashMap(partition.hmsParameters_);
+      partitionStats_ = partition.partitionStats_;
+      hasIncrementalStats_ = partition.hasIncrementalStats_;
+      numRows_ = partition.numRows_;
+      writeId_ = partition.writeId_;
+      if (partition.cachedMsPartitionDescriptor_ != null) {
+        cachedMsPartitionDescriptor_ = new CachedHmsPartitionDescriptor(
+            partition.cachedMsPartitionDescriptor_);
       }
+      // Take over the in-flight events
+      inFlightEvents_ = partition.inFlightEvents_;
+    }
 
-      List<TExprNode> exprNodes = new ArrayList<>();
-      for (TExpr expr: thriftPartition.getPartitionKeyExprs()) {
-        for (TExprNode node: expr.getNodes()) {
-          exprNodes.add(node);
-        }
+    public HdfsPartition build() {
+      if (partitionKeyValues_ == null) partitionKeyValues_ = Collections.emptyList();
+      if (encodedFileDescriptors_ == null) setFileDescriptors(Collections.emptyList());
+      if (hmsParameters_ == null) hmsParameters_ = Collections.emptyMap();
+      if (location_ == null) {
+        // Only prototype partitions can have null locations.
+        Preconditions.checkState(id_ == CatalogObjectsConstants.PROTOTYPE_PARTITION_ID);
       }
-      Preconditions.checkState(clusterCols.size() == exprNodes.size(),
-          String.format("Number of partition columns (%d) does not match number " +
-              "of partition key expressions (%d)",
-              clusterCols.size(), exprNodes.size()));
-
-      for (int i = 0; i < exprNodes.size(); ++i) {
-        literalExpr.add(LiteralExpr.fromThrift(
-            exprNodes.get(i), clusterCols.get(i).getType()));
+      return new HdfsPartition(table_, id_, partitionKeyValues_, fileFormatDescriptor_,
+          encodedFileDescriptors_, location_, isMarkedCached_, accessLevel_,
+          hmsParameters_, cachedMsPartitionDescriptor_, partitionStats_,
+          hasIncrementalStats_, numRows_, writeId_, inFlightEvents_);
+    }
+
+    public Builder setMsPartition(
+        org.apache.hadoop.hive.metastore.api.Partition msPartition)
+        throws CatalogException {
+      if (msPartition == null) {
+        setLocation(table_.getLocation());
+        cachedMsPartitionDescriptor_ = null;
+        hmsParameters_ = Collections.emptyMap();
+        partitionKeyValues_ = Collections.emptyList();
+        return this;
       }
+      setPartitionKeyValues(
+          FeCatalogUtils.parsePartitionKeyValues(table_, msPartition.getValues()));
+      setLocation(msPartition.getSd().getLocation());
+      cachedMsPartitionDescriptor_ = new CachedHmsPartitionDescriptor(msPartition);
+      if (msPartition.getParameters() != null) {
+        isMarkedCached_ = HdfsCachingUtil.getCacheDirectiveId(
+            msPartition.getParameters()) != null;
+        numRows_ = FeCatalogUtils.getRowCount(msPartition.getParameters());
+        hmsParameters_ = msPartition.getParameters();
+        extractAndCompressPartStats();
+        // Intern parameters after removing the incremental stats
+        hmsParameters_ = CatalogInterners.internParameters(hmsParameters_);
+      }
+      if (MetastoreShim.getMajorVersion() > 2) {
+        writeId_ = MetastoreShim.getWriteIdFromMSPartition(msPartition);
+      }
+      // If we have taken over the in-flight events from an old partition instance, don't
+      // overwrite the in-flight event list.
+      if (oldInstance_ == null) addInflightVersionsFromParameters();
+      return this;
     }
 
-    List<HdfsPartition.FileDescriptor> fileDescriptors = new ArrayList<>();
-    if (thriftPartition.isSetFile_desc()) {
-      for (THdfsFileDesc desc: thriftPartition.getFile_desc()) {
-        fileDescriptors.add(HdfsPartition.FileDescriptor.fromThrift(desc));
+    /**
+     * Helper method that removes the partition stats from hmsParameters_, compresses them
+     * and updates partitionsStats_.
+     */
+    private void extractAndCompressPartStats() {
+      try {
+        // Convert the stats stored in the hmsParams map to a deflate-compressed in-memory
+        // byte array format. After conversion, delete the entries in the hmsParams map
+        // as they are not needed anymore.
+        Reference<Boolean> hasIncrStats = new Reference<Boolean>(false);
+        byte[] partitionStats =
+            PartitionStatsUtil.partStatsBytesFromParameters(hmsParameters_, hasIncrStats);
+        setPartitionStatsBytes(partitionStats, hasIncrStats.getRef());
+      } catch (ImpalaException e) {
+        LOG.warn(String.format("Failed to set partition stats for table %s partition %s",
+            getTable().getFullName(), getPartitionName()), e);
+      } finally {
+        // Delete the incremental stats entries. Cleared even on error conditions so that
+        // we do not persist the corrupt entries in the hmsParameters_ map when it is
+        // flushed to the HMS.
+        Maps.filterKeys(hmsParameters_, IS_INCREMENTAL_STATS_KEY).clear();
       }
     }
 
-    TAccessLevel accessLevel = thriftPartition.isSetAccess_level() ?
-        thriftPartition.getAccess_level() : TAccessLevel.READ_WRITE;
-    HdfsPartitionLocationCompressor.Location location = thriftPartition.isSetLocation()
-        ? table.getPartitionLocationCompressor().new Location(
-              thriftPartition.getLocation())
-        : null;
-    HdfsPartition partition = new HdfsPartition(table, null, literalExpr, storageDesc,
-        fileDescriptors, id, location, accessLevel);
-    if (thriftPartition.isSetStats()) {
-      partition.setNumRows(thriftPartition.getStats().getNum_rows());
+    /**
+     * Updates the file format of this partition and sets the corresponding input/output
+     * format classes.
+     */
+    public Builder setFileFormat(HdfsFileFormat fileFormat) {
+      Preconditions.checkNotNull(fileFormatDescriptor_);
+      Preconditions.checkNotNull(cachedMsPartitionDescriptor_);
+      fileFormatDescriptor_ = fileFormatDescriptor_.cloneWithChangedFileFormat(
+          fileFormat);
+      cachedMsPartitionDescriptor_.sdInputFormat = fileFormat.inputFormat();
+      cachedMsPartitionDescriptor_.sdOutputFormat = fileFormat.outputFormat();
+      cachedMsPartitionDescriptor_.sdSerdeInfo.setSerializationLib(
+          fileFormatDescriptor_.getFileFormat().serializationLib());
+      return this;
+    }
+
+    public void putToParameters(Pair<String, String> kv) {
+      putToParameters(kv.first, kv.second);
     }
-    if (thriftPartition.isSetIs_marked_cached()) {
-      partition.isMarkedCached_ = thriftPartition.isIs_marked_cached();
+    public void putToParameters(String k, String v) {
+      Preconditions.checkArgument(!IS_INCREMENTAL_STATS_KEY.apply(k));
+      Preconditions.checkNotNull(hmsParameters_);
+      hmsParameters_.put(k, v);
     }
+    public Map<String, String> getParameters() { return hmsParameters_; }
 
-    if (thriftPartition.isSetHms_parameters()) {
-      partition.hmsParameters_ = CatalogInterners.internParameters(
-          thriftPartition.getHms_parameters());
-    } else {
-      partition.hmsParameters_ = new HashMap<>();
+    public org.apache.hadoop.hive.metastore.api.SerDeInfo getSerdeInfo() {
+      Preconditions.checkNotNull(cachedMsPartitionDescriptor_);
+      return cachedMsPartitionDescriptor_.sdSerdeInfo;
     }
 
-    partition.hasIncrementalStats_ = thriftPartition.has_incremental_stats;
-    if (thriftPartition.isSetPartition_stats()) {
-      partition.partitionStats_ = thriftPartition.getPartition_stats();
+    public Builder setFileFormatDescriptor(HdfsStorageDescriptor fileFormatDescriptor) {
+      fileFormatDescriptor_ = fileFormatDescriptor;
+      return this;
     }
 
-    partition.writeId_ = thriftPartition.isSetWrite_id() ?
-        thriftPartition.getWrite_id() : -1L;
+    public Builder setPartitionKeyValues(List<LiteralExpr> partitionKeyValues) {
+      partitionKeyValues_ = partitionKeyValues;
+      return this;
+    }
 
-    return partition;
-  }
+    public Builder setAccessLevel(TAccessLevel accessLevel) {
+      accessLevel_ = accessLevel;
+      return this;
+    }
 
-  /**
-   * Checks that this partition's metadata is well formed. This does not necessarily
-   * mean the partition is supported by Impala.
-   * Throws a CatalogException if there are any errors in the partition metadata.
-   */
-  public void checkWellFormed() throws CatalogException {
-    try {
-      // Validate all the partition key/values to ensure you can convert them toThrift()
-      Expr.treesToThrift(getPartitionValues());
-    } catch (Exception e) {
-      throw new CatalogException("Partition (" + getPartitionName() +
-          ") has invalid partition column values: ", e);
+    public Builder setNumRows(long numRows) {
+      numRows_ = numRows;
+      return this;
+    }
+    public Builder dropPartitionStats() { return setPartitionStatsBytes(null, false); }
+    public Builder setPartitionStatsBytes(byte[] partitionStats, boolean hasIncrStats) {
+      if (hasIncrStats) Preconditions.checkNotNull(partitionStats);
+      partitionStats_ = partitionStats;
+      hasIncrementalStats_ = hasIncrStats;
+      return this;
+    }
+
+    public Builder setLocation(HdfsPartitionLocationCompressor.Location location) {
+      location_ = location;
+      return this;
+    }
+    public Builder setLocation(String place) {
+      location_ = table_.getPartitionLocationCompressor().new Location(place);
+      return this;
+    }
+
+    public String getLocation() {
+      return (location_ != null) ? location_.toString() : null;
+    }
+
+    public List<FileDescriptor> getFileDescriptors() {
+      // Set an empty descriptors in case that setFileDescriptors hasn't been called.
+      if (encodedFileDescriptors_ == null) setFileDescriptors(new ArrayList<>());
+      // Return a lazily transformed list from our internal bytes storage.
+      return Lists.transform(encodedFileDescriptors_, FileDescriptor.FROM_BYTES);
+    }
+
+    public Builder setFileDescriptors(List<FileDescriptor> descriptors) {
+      // Store an eagerly transformed-and-copied list so that we drop the memory usage
+      // of the flatbuffer wrapper.
+      encodedFileDescriptors_ = ImmutableList.copyOf(Lists.transform(
+          descriptors, FileDescriptor.TO_BYTES));
+      return this;
+    }
+
+    public HdfsFileFormat getFileFormat() {
+      return fileFormatDescriptor_.getFileFormat();
+    }
+
+    public boolean isMarkedCached() { return isMarkedCached_; }
+    public Builder setIsMarkedCached(boolean isCached) {
+      isMarkedCached_ = isCached;
+      return this;
+    }
+
+    public HdfsTable getTable() { return table_; }
+
+    public String getPartitionName() {
+      return FeCatalogUtils.getPartitionName(this);
+    }
+
+    /**
+     * Adds a version number to the in-flight events of this partition
+     * @param isInsertEvent if true, add eventId to list of eventIds for in-flight Insert
+     * events if false, add version number to list of versions for in-flight DDL events
+     * @param versionNumber when isInsertEvent is true, it's eventId to add
+     *                      when isInsertEvent is false, it's version number to add
+     * TODO: merge this with HdfsPartition.addToVersionsForInflightEvents()
+     */
+    public void addToVersionsForInflightEvents(boolean isInsertEvent,
+        long versionNumber) {
+      Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
+          "addToVersionsForInflightEvents called without holding the table lock on "
+              + "partition " + getPartitionName() + " of table " + table_.getFullName());
+      if (!inFlightEvents_.add(isInsertEvent, versionNumber)) {
+        LOG.warn("Could not add {} version to the partition {} of table {}. This could " +
+                "cause unnecessary refresh of the partition when the event is received " +
+                "by the Events processor.",
+            versionNumber, getPartitionName(), getTable().getFullName());
+      }
+    }
+
+    /**
+     * Adds the version from the given Partition parameters. No-op if the parameters does
+     * not contain the <code>MetastoreEventPropertyKey.CATALOG_VERSION</code>. This is
+     * done to detect add partition events from this catalog which are generated when
+     * partitions are added or recovered.
+     */
+    private void addInflightVersionsFromParameters() {
+      Preconditions.checkNotNull(hmsParameters_);
+      Preconditions.checkState(inFlightEvents_.size(false) == 0);
+      // We should not check for table lock being held here since there are certain
+      // code paths which call this method without holding the table lock
+      // (e.g. getOrLoadTable())
+      if (!hmsParameters_.containsKey(
+          MetastoreEventPropertyKey.CATALOG_VERSION.getKey())) {
+        return;
+      }
+      inFlightEvents_.add(false, Long.parseLong(
+          hmsParameters_.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())));
+    }
+
+    public List<LiteralExpr> getPartitionValues() {
+      return partitionKeyValues_;
+    }
+
+    public HdfsPartition getOldInstance() { return oldInstance_; }
+    public long getOldId() { return Preconditions.checkNotNull(oldInstance_).id_; }
+
+    public org.apache.hadoop.hive.metastore.api.Partition toHmsPartition() {
+      // Build a temp HdfsPartition to create the HmsPartition.
+      return build().toHmsPartition();
+    }
+
+    public Builder fromThrift(THdfsPartition thriftPartition) {
+      fileFormatDescriptor_ = HdfsStorageDescriptor.fromThriftPartition(
+          thriftPartition, table_.getName());
+
+      partitionKeyValues_ = new ArrayList<>();
+      if (id_ != CatalogObjectsConstants.PROTOTYPE_PARTITION_ID) {
+        List<Column> clusterCols = table_.getClusteringColumns();
+        List<TExprNode> exprNodes = new ArrayList<>();
+        for (TExpr expr : thriftPartition.getPartitionKeyExprs()) {
+          exprNodes.addAll(expr.getNodes());
+        }
+        Preconditions.checkState(clusterCols.size() == exprNodes.size(),
+            String.format("Number of partition columns (%d) does not match number " +
+                    "of partition key expressions (%d)",
+                clusterCols.size(), exprNodes.size()));
+
+        for (int i = 0; i < exprNodes.size(); ++i) {
+          partitionKeyValues_.add(LiteralExpr.fromThrift(
+              exprNodes.get(i), clusterCols.get(i).getType()));
+        }
+      }
+
+      List<FileDescriptor> fileDescriptors = new ArrayList<>();
+      if (thriftPartition.isSetFile_desc()) {
+        for (THdfsFileDesc desc : thriftPartition.getFile_desc()) {
+          fileDescriptors.add(HdfsPartition.FileDescriptor.fromThrift(desc));
+        }
+      }
+      setFileDescriptors(fileDescriptors);
+
+      accessLevel_ = thriftPartition.isSetAccess_level() ?
+          thriftPartition.getAccess_level() : TAccessLevel.READ_WRITE;
+      location_ = thriftPartition.isSetLocation()
+          ? table_.getPartitionLocationCompressor().new Location(
+          thriftPartition.getLocation())
+          : null;
+      if (thriftPartition.isSetStats()) {
+        numRows_ = thriftPartition.getStats().getNum_rows();
+      }
+      hasIncrementalStats_ = thriftPartition.has_incremental_stats;
+      if (thriftPartition.isSetPartition_stats()) {
+        partitionStats_ = thriftPartition.getPartition_stats();
+      }
+      if (thriftPartition.isSetIs_marked_cached()) {
+        isMarkedCached_ = thriftPartition.isIs_marked_cached();
+      }
+      if (thriftPartition.isSetHms_parameters()) {
+        hmsParameters_ = CatalogInterners.internParameters(
+            thriftPartition.getHms_parameters());
+      } else {
+        hmsParameters_ = new HashMap<>();
+      }
+      writeId_ = thriftPartition.isSetWrite_id() ?
+          thriftPartition.getWrite_id() : -1L;
+      return this;
+    }
+
+    /**
+     * Checks that this partition's metadata is well formed. This does not necessarily
+     * mean the partition is supported by Impala.
+     * Throws a CatalogException if there are any errors in the partition metadata.
+     */
+    public void checkWellFormed() throws CatalogException {
+      try {
+        // Validate all the partition key/values to ensure you can convert them toThrift()
+        Expr.treesToThrift(getPartitionValues());
+      } catch (Exception e) {
+        throw new CatalogException("Partition (" + getPartitionName() +
+            ") has invalid partition column values: ", e);
+      }
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 41f2a60..a5b231e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
@@ -108,6 +109,7 @@ import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
 
@@ -259,6 +261,13 @@ public class HdfsTable extends Table implements FeFsTable {
   // null in the case that this table is not transactional.
   protected ValidWriteIdList validWriteIds_ = null;
 
+  // Partitions are marked as "dirty" indicating there are in-progress modifications on
+  // their metadata. The corresponding partition builder contains the new version of the
+  // metadata so represents the in-progress modifications. The modifications will be
+  // finalized in the coming incremental metadata refresh (see updatePartitionsFromHms()
+  // for more details). This map is only maintained in the catalogd.
+  private final Map<Long, HdfsPartition.Builder> dirtyPartitions_ = new HashMap<>();
+
   // Represents a set of storage-related statistics aggregated at the table or partition
   // level.
   public final static class FileMetadataStats {
@@ -440,13 +449,47 @@ public class HdfsTable extends Table implements FeFsTable {
   }
 
   /**
-   * Gets the HdfsPartition matching the given partition spec. Returns null if no match
-   * was found.
+   * Marks a partition dirty by registering the partition builder for its new instance.
+   */
+  public void markDirtyPartition(HdfsPartition.Builder partBuilder) {
+    dirtyPartitions_.put(partBuilder.getOldId(), partBuilder);
+  }
+
+  /**
+   * @return true if whether there are any in-progress modifications on metadata of this
+   * partition.
+   */
+  public boolean isDirtyPartition(HdfsPartition partition) {
+    return dirtyPartitions_.containsKey(partition.getId());
+  }
+
+  /**
+   * Pick up the partition builder to continue the in-progress modifications.
+   * The builder is then unregistered so the callers should guarantee that the in-progress
+   * modifications are finalized (by calling Builder.build() and use the new instance to
+   * replace the old one).
+   * @return the builder for given partition's new instance.
    */
-  public HdfsPartition getPartition(List<PartitionKeyValue> partitionSpec) {
-    return (HdfsPartition)getPartition(this, partitionSpec);
+  public HdfsPartition.Builder pickInprogressPartitionBuilder(HdfsPartition partition) {
+    return dirtyPartitions_.remove(partition.getId());
   }
 
+  /**
+   * @return true if any partitions are dirty.
+   */
+  @Override
+  public boolean hasInProgressModification() { return !dirtyPartitions_.isEmpty(); }
+
+  /**
+   * Clears all the in-progress modifications by clearing all the partition builders.
+   */
+  @Override
+  public void resetInProgressModification() { dirtyPartitions_.clear(); }
+
+  /**
+   * Gets the PrunablePartition matching the given partition spec. Returns null if no
+   * match was found.
+   */
   public static PrunablePartition getPartition(FeFsTable table,
       List<PartitionKeyValue> partitionSpec) {
     List<TPartitionKeyValue> partitionKeyValues = new ArrayList<>();
@@ -563,8 +606,7 @@ public class HdfsTable extends Table implements FeFsTable {
    * If there are no partitions in the Hive metadata, a single partition is added with no
    * partition keys.
    */
-  private long loadAllPartitions(IMetaStoreClient client,
-      List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions,
+  private long loadAllPartitions(IMetaStoreClient client, List<Partition> msPartitions,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws IOException,
       CatalogException {
     Preconditions.checkNotNull(msTbl);
@@ -576,30 +618,30 @@ public class HdfsTable extends Table implements FeFsTable {
     Path tblLocation = FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath());
     accessLevel_ = getAvailableAccessLevel(getFullName(), tblLocation, permCache);
 
+    List<HdfsPartition.Builder> partBuilders = new ArrayList<>();
     if (msTbl.getPartitionKeysSize() == 0) {
       Preconditions.checkArgument(msPartitions == null || msPartitions.isEmpty());
       // This table has no partition key, which means it has no declared partitions.
       // We model partitions slightly differently to Hive - every file must exist in a
       // partition, so add a single partition with no keys which will get all the
       // files in the table's root directory.
-      HdfsPartition part = createPartition(msTbl.getSd(), null, permCache);
-      if (isMarkedCached_) part.markCached();
-      addPartition(part);
+      HdfsPartition.Builder partBuilder = createPartitionBuilder(msTbl.getSd(),
+          /*msPartition=*/null, permCache);
+      partBuilder.setIsMarkedCached(isMarkedCached_);
+      setUnpartitionedTableStats(partBuilder);
+      partBuilders.add(partBuilder);
     } else {
-      for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
-        HdfsPartition partition = createPartition(msPartition.getSd(), msPartition,
-            permCache);
-        addPartition(partition);
-        // If the partition is null, its HDFS path does not exist, and it was not added
-        // to this table's partition list. Skip the partition.
-        if (partition == null) continue;
+      for (Partition msPartition: msPartitions) {
+        partBuilders.add(createPartitionBuilder(
+            msPartition.getSd(), msPartition, permCache));
       }
     }
     // Load the file metadata from scratch.
     Timer.Context fileMetadataLdContext = getMetrics().getTimer(
         HdfsTable.LOAD_DURATION_FILE_METADATA_ALL_PARTITIONS).time();
-    loadFileMetadataForPartitions(client, partitionMap_.values(), /*isRefresh=*/false);
+    loadFileMetadataForPartitions(client, partBuilders, /*isRefresh=*/false);
     fileMetadataLdContext.stop();
+    for (HdfsPartition.Builder p : partBuilders) addPartition(p.build());
     return clock.getTick() - startTime;
   }
 
@@ -620,9 +662,11 @@ public class HdfsTable extends Table implements FeFsTable {
    *
    * @param isRefresh whether this is a refresh operation or an initial load. This only
    * affects logging.
+   * @return time in nanoseconds spent in loading file metadata.
    */
-  private void loadFileMetadataForPartitions(IMetaStoreClient client,
-      Collection<HdfsPartition> parts, boolean isRefresh) throws CatalogException {
+  private long loadFileMetadataForPartitions(IMetaStoreClient client,
+      Collection<HdfsPartition.Builder> partBuilders, boolean isRefresh)
+      throws CatalogException {
     final Clock clock = Clock.defaultClock();
     long startTime = clock.getTick();
 
@@ -635,7 +679,7 @@ public class HdfsTable extends Table implements FeFsTable {
     ValidTxnList validTxnList = validWriteIds_ != null ? loadValidTxns(client) : null;
     String logPrefix = String.format(
         "%s file and block metadata for %s paths for table %s",
-        isRefresh ? "Refreshing" : "Loading", parts.size(),
+        isRefresh ? "Refreshing" : "Loading", partBuilders.size(),
         getFullName());
 
     // Actually load the partitions.
@@ -643,7 +687,8 @@ public class HdfsTable extends Table implements FeFsTable {
     // we'll throw an exception here and end up bailing out of whatever catalog operation
     // we're in the middle of. This could cause a partial metadata update -- eg we may
     // have refreshed the top-level table properties without refreshing the files.
-    new ParallelFileMetadataLoader(this, parts, validWriteIds_, validTxnList, logPrefix)
+    new ParallelFileMetadataLoader(
+        this, partBuilders, validWriteIds_, validTxnList, logPrefix)
         .loadAndSet();
 
     // TODO(todd): would be good to log a summary of the loading process:
@@ -651,15 +696,18 @@ public class HdfsTable extends Table implements FeFsTable {
     // - how many partitions did we read metadata for
     // - etc...
     String partNames = Joiner.on(", ").join(
-        Iterables.limit(Iterables.transform(parts, HdfsPartition::getPartitionName), 3));
-    if (parts.size() > 3) {
+        Iterables.limit(
+            Iterables.transform(partBuilders, HdfsPartition.Builder::getPartitionName),
+            3));
+    if (partBuilders.size() > 3) {
       partNames += String.format(", and %s others",
-          Iterables.size(parts) - 3);
+          Iterables.size(partBuilders) - 3);
     }
 
     long duration = clock.getTick() - startTime;
     LOG.info("Loaded file and block metadata for {} partitions: {}. Time taken: {}",
         getFullName(), partNames, PrintUtils.printTimeNs(duration));
+    return duration;
   }
 
   public FileSystem getFileSystem() throws CatalogException {
@@ -729,49 +777,54 @@ public class HdfsTable extends Table implements FeFsTable {
   public List<HdfsPartition> createAndLoadPartitions(IMetaStoreClient client,
       List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions)
       throws CatalogException {
-    List<HdfsPartition> addedParts = new ArrayList<>();
+    List<HdfsPartition.Builder> addedPartBuilders = new ArrayList<>();
     FsPermissionCache permCache = preloadPermissionsCache(msPartitions);
     for (org.apache.hadoop.hive.metastore.api.Partition partition: msPartitions) {
-      HdfsPartition hdfsPartition = createPartition(partition.getSd(), partition,
-          permCache);
-      Preconditions.checkNotNull(hdfsPartition);
-      addedParts.add(hdfsPartition);
+      HdfsPartition.Builder partBuilder = createPartitionBuilder(partition.getSd(),
+          partition, permCache);
+      Preconditions.checkNotNull(partBuilder);
+      addedPartBuilders.add(partBuilder);
     }
-    loadFileMetadataForPartitions(client, addedParts, /* isRefresh = */ false);
-    return addedParts;
+    loadFileMetadataForPartitions(client, addedPartBuilders, /*isRefresh=*/false);
+    return addedPartBuilders.stream()
+        .map(HdfsPartition.Builder::build)
+        .collect(Collectors.toList());
   }
 
   /**
-   * Creates a new HdfsPartition from a specified StorageDescriptor and an HMS partition
-   * object.
+   * Creates a new HdfsPartition.Builder from a specified StorageDescriptor and an HMS
+   * partition object.
    */
-  private HdfsPartition createPartition(StorageDescriptor storageDescriptor,
-      org.apache.hadoop.hive.metastore.api.Partition msPartition,
+  private HdfsPartition.Builder createPartitionBuilder(
+      StorageDescriptor storageDescriptor, Partition msPartition,
       FsPermissionCache permCache) throws CatalogException {
-    HdfsStorageDescriptor fileFormatDescriptor =
-        HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor);
-    List<LiteralExpr> keyValues;
-    if (msPartition != null) {
-      keyValues = FeCatalogUtils.parsePartitionKeyValues(this, msPartition.getValues());
-    } else {
-      keyValues = Collections.emptyList();
-    }
+    return createOrUpdatePartitionBuilder(
+        storageDescriptor, msPartition, permCache, null);
+  }
+
+  private HdfsPartition.Builder createOrUpdatePartitionBuilder(
+      StorageDescriptor storageDescriptor,
+      org.apache.hadoop.hive.metastore.api.Partition msPartition,
+      FsPermissionCache permCache, HdfsPartition.Builder partBuilder)
+      throws CatalogException {
+    if (partBuilder == null) partBuilder = new HdfsPartition.Builder(this);
+    partBuilder
+        .setMsPartition(msPartition)
+        .setFileFormatDescriptor(
+            HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor));
     Path partDirPath = new Path(storageDescriptor.getLocation());
     try {
       if (msPartition != null) {
-        HdfsCachingUtil.validateCacheParams(msPartition.getParameters());
+        // Update the parameters based on validations with hdfs.
+        boolean isCached = HdfsCachingUtil.validateCacheParams(
+            partBuilder.getParameters());
+        partBuilder.setIsMarkedCached(isCached);
       }
       TAccessLevel accessLevel = getAvailableAccessLevel(getFullName(), partDirPath,
           permCache);
-      HdfsPartition partition =
-          new HdfsPartition(this, msPartition, keyValues, fileFormatDescriptor,
-          new ArrayList<FileDescriptor>(), accessLevel);
-      partition.checkWellFormed();
-      // Set the partition's #rows.
-      if (msPartition != null && msPartition.getParameters() != null) {
-         partition.setNumRows(FeCatalogUtils.getRowCount(msPartition.getParameters()));
-      }
-      if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
+      partBuilder.setAccessLevel(accessLevel);
+      partBuilder.checkWellFormed();
+      if (!TAccessLevelUtil.impliesWriteAccess(accessLevel)) {
           // TODO: READ_ONLY isn't exactly correct because the it's possible the
           // partition does not have READ permissions either. When we start checking
           // whether we can READ from a table, this should be updated to set the
@@ -780,7 +833,7 @@ public class HdfsTable extends Table implements FeFsTable {
           // WRITE_ONLY the table's access level should be NONE.
           accessLevel_ = TAccessLevel.READ_ONLY;
       }
-      return partition;
+      return partBuilder;
     } catch (IOException e) {
       throw new CatalogException("Error initializing partition", e);
     }
@@ -833,6 +886,21 @@ public class HdfsTable extends Table implements FeFsTable {
     }
   }
 
+  public void updatePartitions(List<HdfsPartition.Builder> partBuilders)
+      throws CatalogException {
+    for (HdfsPartition.Builder p : partBuilders) updatePartition(p);
+  }
+
+  public void updatePartition(HdfsPartition.Builder partBuilder) throws CatalogException {
+    HdfsPartition oldPartition = partBuilder.getOldInstance();
+    Preconditions.checkNotNull(oldPartition);
+    Preconditions.checkState(partitionMap_.containsKey(oldPartition.getId()));
+    HdfsPartition newPartition = partBuilder.build();
+    // Partition is reloaded and hence cache directives are not dropped.
+    dropPartition(oldPartition, false);
+    addPartition(newPartition);
+  }
+
   /**
    * Drops the partition having the given partition spec from HdfsTable. Cleans up its
    * metadata from all the mappings used to speed up partition pruning/lookup.
@@ -862,13 +930,20 @@ public class HdfsTable extends Table implements FeFsTable {
     nameToPartitionMap_.remove(partition.getPartitionName());
     if (removeCacheDirective && partition.isMarkedCached()) {
       try {
-        HdfsCachingUtil.removePartitionCacheDirective(partition);
+        // Partition's parameters map is immutable. Create a temp one for the cleanup.
+        HdfsCachingUtil.removePartitionCacheDirective(Maps.newHashMap(
+            partition.getParameters()));
       } catch (ImpalaException e) {
         LOG.error("Unable to remove the cache directive on table " + getFullName() +
             ", partition " + partition.getPartitionName() + ": ", e);
       }
     }
-    if (!isStoredInImpaladCatalogCache()) return partition;
+    // dirtyPartitions_ is only maintained in the catalogd. nullPartitionIds_ and
+    // partitionValuesMap_ are only maintained in coordinators.
+    if (!isStoredInImpaladCatalogCache()) {
+      dirtyPartitions_.remove(partitionId);
+      return partition;
+    }
     for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
       ColumnStats stats = getColumns().get(i).getStats();
       LiteralExpr literal = partition.getPartitionValues().get(i);
@@ -987,6 +1062,8 @@ public class HdfsTable extends Table implements FeFsTable {
           loadConstraintsInfo(client, msTbl);
         }
         loadValidWriteIdList(client);
+        // Set table-level stats first so partition stats can inherit it.
+        setTableStats(msTbl);
         // Load partition and file metadata
         if (reuseMetadata) {
           // Incrementally update this table's partitions and file metadata
@@ -996,6 +1073,8 @@ public class HdfsTable extends Table implements FeFsTable {
           if (msTbl.getPartitionKeysSize() == 0) {
             if (loadParitionFileMetadata) {
               storageMetadataLoadTime_ += updateUnpartitionedTableFileMd(client);
+            } else {  // Update the single partition stats in case table stats changes.
+              updateUnpartitionedTableStats();
             }
           } else {
             storageMetadataLoadTime_ += updatePartitionsFromHms(
@@ -1015,9 +1094,10 @@ public class HdfsTable extends Table implements FeFsTable {
           allPartitionsLdContext.stop();
         }
         if (loadTableSchema) setAvroSchema(client, msTbl);
-        setTableStats(msTbl);
         fileMetadataStats_.unset();
         refreshLastUsedTime();
+        // Make sure all the partition modifications are done.
+        Preconditions.checkState(dirtyPartitions_.isEmpty());
       } catch (TableLoadingException e) {
         throw e;
       } catch (Exception e) {
@@ -1079,7 +1159,7 @@ public class HdfsTable extends Table implements FeFsTable {
    * Returns time spent updating the file metadata in nanoseconds.
    *
    * This is optimized for the case where few files have changed. See
-   * {@link #refreshFileMetadata(Path, List)} above for details.
+   * {@link FileMetadataLoader#load} for details.
    */
   private long updateUnpartitionedTableFileMd(IMetaStoreClient client)
       throws CatalogException {
@@ -1087,26 +1167,36 @@ public class HdfsTable extends Table implements FeFsTable {
     if (LOG.isTraceEnabled()) {
       LOG.trace("update unpartitioned table: " + getFullName());
     }
-    final Clock clock = Clock.defaultClock();
-    long startTime = clock.getTick();
     HdfsPartition oldPartition = Iterables.getOnlyElement(partitionMap_.values());
-
-    // Instead of updating the existing partition in place, we create a new one
-    // so that we reflect any changes in the msTbl object and also assign a new
-    // ID. This is one step towards eventually implementing IMPALA-7533.
     resetPartitions();
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     Preconditions.checkNotNull(msTbl);
     setPrototypePartition(msTbl.getSd());
-    HdfsPartition part = createPartition(msTbl.getSd(), null, new FsPermissionCache());
+    HdfsPartition.Builder partBuilder = createPartitionBuilder(msTbl.getSd(),
+        /*msPartition=*/null, new FsPermissionCache());
     // Copy over the FDs from the old partition to the new one, so that
     // 'refreshPartitionFileMetadata' below can compare modification times and
     // reload the locations only for those that changed.
-    part.setFileDescriptors(oldPartition.getFileDescriptors());
-    addPartition(part);
-    if (isMarkedCached_) part.markCached();
-    loadFileMetadataForPartitions(client, ImmutableList.of(part), /*isRefresh=*/true);
-    return clock.getTick() - startTime;
+    partBuilder.setFileDescriptors(oldPartition.getFileDescriptors())
+        .setIsMarkedCached(isMarkedCached_);
+    long fileMdLoadTime = loadFileMetadataForPartitions(client,
+        ImmutableList.of(partBuilder), /*isRefresh=*/true);
+    setUnpartitionedTableStats(partBuilder);
+    addPartition(partBuilder.build());
+    return fileMdLoadTime;
+  }
+
+  /**
+   * Updates the single partition stats of an unpartitioned HdfsTable.
+   */
+  private void updateUnpartitionedTableStats() throws CatalogException {
+    // Just update the single partition if its #rows is stale.
+    HdfsPartition oldPartition = Iterables.getOnlyElement(partitionMap_.values());
+    if (oldPartition.getNumRows() != getNumRows()) {
+      HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(oldPartition)
+          .setNumRows(getNumRows());
+      updatePartition(partBuilder);
+    }
   }
 
   /**
@@ -1123,6 +1213,7 @@ public class HdfsTable extends Table implements FeFsTable {
   private long updatePartitionsFromHms(IMetaStoreClient client,
       Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata)
       throws Exception {
+    long fileMdLoadTime = 0;
     if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + getFullName());
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     Preconditions.checkNotNull(msTbl);
@@ -1138,9 +1229,9 @@ public class HdfsTable extends Table implements FeFsTable {
     // Names of loaded partitions in this table
     Set<String> partitionNames = new HashSet<>();
     // Partitions for which file metadata must be loaded
-    List<HdfsPartition> partitionsToLoadFiles = Lists.newArrayList();
+    List<HdfsPartition.Builder> partitionsToLoadFiles = new ArrayList<>();
     // Partitions that need to be dropped and recreated from scratch
-    List<HdfsPartition> dirtyPartitions = new ArrayList<>();
+    List<HdfsPartition.Builder> dirtyPartitions = new ArrayList<>();
     // Partitions removed from the Hive Metastore.
     List<HdfsPartition> removedPartitions = new ArrayList<>();
     // Identify dirty partitions that need to be loaded from the Hive Metastore and
@@ -1150,30 +1241,27 @@ public class HdfsTable extends Table implements FeFsTable {
       // that were removed from HMS using some external process, e.g. Hive.
       if (!msPartitionNames.contains(partition.getPartitionName())) {
         removedPartitions.add(partition);
-      }
-      if (partition.isDirty()) {
-        // Dirty partitions are updated by removing them from table's partition
-        // list and loading them from the Hive Metastore.
-        dirtyPartitions.add(partition);
-      } else {
-        if (partitionsToUpdate == null && loadPartitionFileMetadata) {
-          partitionsToLoadFiles.add(partition);
-        }
+      } else if (isDirtyPartition(partition)) {
+        // The modification of dirty partitions have been started. Pick up the in-progress
+        // partition builders to finalize the modification.
+        dirtyPartitions.add(pickInprogressPartitionBuilder(partition));
+      } else if (partitionsToUpdate == null && loadPartitionFileMetadata) {
+        partitionsToLoadFiles.add(new HdfsPartition.Builder(partition));
       }
       Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor());
       partitionNames.add(partition.getPartitionName());
     }
     dropPartitions(removedPartitions);
-    // dirtyPartitions are reloaded and hence cache directives are not dropped.
-    dropPartitions(dirtyPartitions, false);
-    // Load dirty partitions from Hive Metastore
-    // TODO(todd): the logic around "dirty partitions" is highly suspicious.
-    loadPartitionsFromMetastore(dirtyPartitions, client);
+    // Load dirty partitions from Hive Metastore. File metadata of dirty partitions will
+    // always be reloaded (ignore the loadPartitionFileMetadata flag).
+    fileMdLoadTime += loadPartitionsFromMetastore(dirtyPartitions, client);
+    Preconditions.checkState(!hasInProgressModification());
 
     // Identify and load partitions that were added in the Hive Metastore but don't
-    // exist in this table.
+    // exist in this table. File metadata of them will be loaded.
     Set<String> newPartitionsInHms = Sets.difference(msPartitionNames, partitionNames);
-    loadPartitionsFromMetastore(newPartitionsInHms, client);
+    fileMdLoadTime += loadPartitionsFromMetastore(newPartitionsInHms,
+        /*inprogressPartBuilders=*/null, client);
     // If a list of modified partitions (old and new) is specified, don't reload file
     // metadata for the new ones as they have already been detected in HMS and have been
     // reloaded by loadPartitionsFromMetastore().
@@ -1184,19 +1272,19 @@ public class HdfsTable extends Table implements FeFsTable {
     // Load file metadata. Until we have a notification mechanism for when a
     // file changes in hdfs, it is sometimes required to reload all the file
     // descriptors and block metadata of a table (e.g. REFRESH statement).
-    long fileLoadMdTime = 0;
     if (loadPartitionFileMetadata) {
-      final Clock clock = Clock.defaultClock();
-      long startTime = clock.getTick();
       if (partitionsToUpdate != null) {
         Preconditions.checkState(partitionsToLoadFiles.isEmpty());
         // Only reload file metadata of partitions specified in 'partitionsToUpdate'
-        partitionsToLoadFiles = getPartitionsForNames(partitionsToUpdate);
+        List<HdfsPartition> parts = getPartitionsForNames(partitionsToUpdate);
+        partitionsToLoadFiles = parts.stream().map(HdfsPartition.Builder::new)
+            .collect(Collectors.toList());
       }
-      loadFileMetadataForPartitions(client, partitionsToLoadFiles, /* isRefresh=*/true);
-      fileLoadMdTime = clock.getTick() - startTime;
+      fileMdLoadTime += loadFileMetadataForPartitions(client, partitionsToLoadFiles,
+          /* isRefresh=*/true);
+      updatePartitions(partitionsToLoadFiles);
     }
-    return fileLoadMdTime;
+    return fileMdLoadTime;
   }
 
   /**
@@ -1218,18 +1306,11 @@ public class HdfsTable extends Table implements FeFsTable {
     return parts;
   }
 
-  @Override
-  public void setTableStats(org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    super.setTableStats(msTbl);
+  private void setUnpartitionedTableStats(HdfsPartition.Builder partBuilder) {
+    Preconditions.checkState(numClusteringCols_ == 0);
     // For unpartitioned tables set the numRows in its single partition
     // to the table's numRows.
-    if (numClusteringCols_ == 0 && !partitionMap_.isEmpty()) {
-      // Unpartitioned tables have a default partition.
-      Preconditions.checkState(partitionMap_.size() == 1);
-      for (HdfsPartition p: partitionMap_.values()) {
-        p.setNumRows(getNumRows());
-      }
-    }
+    partBuilder.setNumRows(getNumRows());
   }
 
   /**
@@ -1318,48 +1399,66 @@ public class HdfsTable extends Table implements FeFsTable {
   /**
    * Loads partitions from the Hive Metastore and adds them to the internal list of
    * table partitions.
+   * @return time in nanoseconds spent in loading file metadata.
    */
-  private void loadPartitionsFromMetastore(List<HdfsPartition> partitions,
+  private long loadPartitionsFromMetastore(List<HdfsPartition.Builder> partitions,
       IMetaStoreClient client) throws Exception {
     Preconditions.checkNotNull(partitions);
-    if (partitions.isEmpty()) return;
+    if (partitions.isEmpty()) return 0;
     if (LOG.isTraceEnabled()) {
       LOG.trace(String.format("Incrementally updating %d/%d partitions.",
           partitions.size(), partitionMap_.size()));
     }
-    Set<String> partitionNames = new HashSet<>();
-    for (HdfsPartition part: partitions) {
-      partitionNames.add(part.getPartitionName());
+    Map<String, HdfsPartition.Builder> partBuilders = Maps.newHashMap();
+    for (HdfsPartition.Builder part: partitions) {
+      partBuilders.put(part.getPartitionName(), part);
     }
-    loadPartitionsFromMetastore(partitionNames, client);
+    return loadPartitionsFromMetastore(partBuilders.keySet(), partBuilders, client);
   }
 
   /**
-   * Loads from the Hive Metastore the partitions that correspond to the specified
-   * 'partitionNames' and adds them to the internal list of table partitions.
+   * Loads from the Hive Metastore and file system the partitions that correspond to
+   * the specified 'partitionNames' and adds/updates them to the internal list of table
+   * partitions.
+   * If 'inprogressPartBuilders' is null, new partitions will be created.
+   * If 'inprogressPartBuilders' is not null, take over the in-progress modifications
+   * and finalized them by updating the existing partitions.
+   * @return time in nanoseconds spent in loading file metadata.
    */
-  private void loadPartitionsFromMetastore(Set<String> partitionNames,
-      IMetaStoreClient client) throws Exception {
+  private long loadPartitionsFromMetastore(Set<String> partitionNames,
+      Map<String, HdfsPartition.Builder> inprogressPartBuilders, IMetaStoreClient client)
+      throws Exception {
     Preconditions.checkNotNull(partitionNames);
-    if (partitionNames.isEmpty()) return;
+    if (partitionNames.isEmpty()) return 0;
     // Load partition metadata from Hive Metastore.
-    List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
-        new ArrayList<>();
-    msPartitions.addAll(MetaStoreUtil.fetchPartitionsByName(client,
-        Lists.newArrayList(partitionNames), db_.getName(), name_));
+    List<Partition> msPartitions = new ArrayList<>(
+        MetaStoreUtil.fetchPartitionsByName(
+            client, Lists.newArrayList(partitionNames), db_.getName(), name_));
 
     FsPermissionCache permCache = preloadPermissionsCache(msPartitions);
-    List<HdfsPartition> partitions = new ArrayList<>(msPartitions.size());
+    List<HdfsPartition.Builder> partBuilders = new ArrayList<>(msPartitions.size());
     for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
-      HdfsPartition partition = createPartition(msPartition.getSd(), msPartition,
-          permCache);
-      // If the partition is null, its HDFS path does not exist, and it was not added to
-      // this table's partition list. Skip the partition.
-      if (partition == null) continue;
-      partitions.add(partition);
+      String partName = FeCatalogUtils.getPartitionName(this, msPartition.getValues());
+      HdfsPartition.Builder partBuilder = null;
+      if (inprogressPartBuilders != null) {
+        // If we have a in-progress partition modification, update the partition builder.
+        partBuilder = inprogressPartBuilders.get(partName);
+        Preconditions.checkNotNull(partBuilder);
+      }
+      partBuilder = createOrUpdatePartitionBuilder(
+          msPartition.getSd(), msPartition, permCache, partBuilder);
+      partBuilders.add(partBuilder);
+    }
+    long fileMdLoadTime = loadFileMetadataForPartitions(client, partBuilders,
+        /* isRefresh=*/false);
+    for (HdfsPartition.Builder p : partBuilders) {
+      if (inprogressPartBuilders == null) {
+        addPartition(p.build());
+      } else {
+        updatePartition(p);
+      }
     }
-    loadFileMetadataForPartitions(client, partitions, /* isRefresh=*/false);
-    for (HdfsPartition partition : partitions) addPartition(partition);
+    return fileMdLoadTime;
   }
 
   /**
@@ -1445,13 +1544,14 @@ public class HdfsTable extends Table implements FeFsTable {
     resetPartitions();
     try {
       for (Map.Entry<Long, THdfsPartition> part: hdfsTable.getPartitions().entrySet()) {
-        HdfsPartition hdfsPart =
-            HdfsPartition.fromThrift(this, part.getKey(), part.getValue());
-        addPartition(hdfsPart);
+        addPartition(new HdfsPartition.Builder(this, part.getKey())
+            .fromThrift(part.getValue())
+            .build());
       }
-      prototypePartition_ = HdfsPartition.fromThrift(this,
-          CatalogObjectsConstants.PROTOTYPE_PARTITION_ID,
-          hdfsTable.prototype_partition);
+      prototypePartition_ =
+          new HdfsPartition.Builder(this, CatalogObjectsConstants.PROTOTYPE_PARTITION_ID)
+              .fromThrift(hdfsTable.prototype_partition)
+              .build();
     } catch (CatalogException e) {
       throw new TableLoadingException(e.getMessage());
     }
@@ -1986,20 +2086,18 @@ public class HdfsTable extends Table implements FeFsTable {
    */
   public void reloadPartition(IMetaStoreClient client, HdfsPartition oldPartition,
       Partition hmsPartition) throws CatalogException {
-    // Instead of updating the existing partition in place, we create a new one
-    // so that we reflect any changes in the hmsPartition object and also assign a new
-    // ID. This is one step towards eventually implementing IMPALA-7533.
-    HdfsPartition refreshedPartition = createPartition(
+    HdfsPartition.Builder partBuilder = createPartitionBuilder(
         hmsPartition.getSd(), hmsPartition, new FsPermissionCache());
     Preconditions.checkArgument(oldPartition == null
-        || HdfsPartition.KV_COMPARATOR.compare(oldPartition, refreshedPartition) == 0);
+        || HdfsPartition.comparePartitionKeyValues(
+            oldPartition.getPartitionValues(), partBuilder.getPartitionValues()) == 0);
     if (oldPartition != null) {
-      refreshedPartition.setFileDescriptors(oldPartition.getFileDescriptors());
+      partBuilder.setFileDescriptors(oldPartition.getFileDescriptors());
     }
-    loadFileMetadataForPartitions(client, ImmutableList.of(refreshedPartition),
+    loadFileMetadataForPartitions(client, ImmutableList.of(partBuilder),
         /*isRefresh=*/true);
     dropPartition(oldPartition, false);
-    addPartition(refreshedPartition);
+    addPartition(partBuilder.build());
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index c41b180..5381f2e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -70,10 +70,11 @@ public class ParallelFileMetadataLoader {
 
   private final String logPrefix_;
   private final Map<Path, FileMetadataLoader> loaders_;
-  private final Map<Path, List<HdfsPartition>> partsByPath_;
+  private final Map<Path, List<HdfsPartition.Builder>> partsByPath_;
   private final FileSystem fs_;
 
-  public ParallelFileMetadataLoader(HdfsTable table, Collection<HdfsPartition> parts,
+  public ParallelFileMetadataLoader(HdfsTable table,
+      Collection<HdfsPartition.Builder> partBuilders,
       ValidWriteIdList writeIdList, ValidTxnList validTxnList, String logPrefix)
       throws CatalogException {
     if (writeIdList != null || validTxnList != null) {
@@ -84,14 +85,14 @@ public class ParallelFileMetadataLoader {
     // Group the partitions by their path (multiple partitions may point to the same
     // path).
     partsByPath_ = Maps.newHashMap();
-    for (HdfsPartition p : parts) {
+    for (HdfsPartition.Builder p : partBuilders) {
       Path partPath = FileSystemUtil.createFullyQualifiedPath(new Path(p.getLocation()));
       partsByPath_.computeIfAbsent(partPath, (path) -> new ArrayList<>())
           .add(p);
     }
     // Create a FileMetadataLoader for each path.
     loaders_ = Maps.newHashMap();
-    for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath_.entrySet()) {
+    for (Map.Entry<Path, List<HdfsPartition.Builder>> e : partsByPath_.entrySet()) {
       List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
       FileMetadataLoader loader = new FileMetadataLoader(e.getKey(),
           Utils.shouldRecursivelyListPartitions(table), oldFds, table.getHostIndex(),
@@ -100,7 +101,7 @@ public class ParallelFileMetadataLoader {
       // locations even if the underlying files have not changed.
       // This is done to keep the cached block metadata up to date.
       boolean hasCachedPartition = Iterables.any(e.getValue(),
-          HdfsPartition::isMarkedCached);
+          HdfsPartition.Builder::isMarkedCached);
       loader.setForceRefreshBlockLocations(hasCachedPartition);
       loaders_.put(e.getKey(), loader);
     }
@@ -117,12 +118,12 @@ public class ParallelFileMetadataLoader {
     load();
 
     // Store the loaded FDs into the partitions.
-    for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath_.entrySet()) {
+    for (Map.Entry<Path, List<HdfsPartition.Builder>> e : partsByPath_.entrySet()) {
       Path p = e.getKey();
       FileMetadataLoader loader = loaders_.get(p);
 
-      for (HdfsPartition part : e.getValue()) {
-        part.setFileDescriptors(loader.getLoadedFds());
+      for (HdfsPartition.Builder partBuilder : e.getValue()) {
+        partBuilder.setFileDescriptors(loader.getLoadedFds());
       }
     }
   }
@@ -135,12 +136,12 @@ public class ParallelFileMetadataLoader {
   Map<HdfsPartition, List<FileDescriptor>> loadAndGet() throws TableLoadingException {
     load();
     Map<HdfsPartition, List<FileDescriptor>> result = Maps.newHashMap();
-    for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath_.entrySet()) {
+    for (Map.Entry<Path, List<HdfsPartition.Builder>> e : partsByPath_.entrySet()) {
       Path p = e.getKey();
       FileMetadataLoader loader = loaders_.get(p);
 
-      for (HdfsPartition part : e.getValue()) {
-        result.put(part, loader.getLoadedFds());
+      for (HdfsPartition.Builder partBuilder : e.getValue()) {
+        result.put(partBuilder.getOldInstance(), loader.getLoadedFds());
       }
     }
     return result;
diff --git a/fe/src/main/java/org/apache/impala/catalog/PartitionStatsUtil.java b/fe/src/main/java/org/apache/impala/catalog/PartitionStatsUtil.java
index bc36439..e47591a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/PartitionStatsUtil.java
+++ b/fe/src/main/java/org/apache/impala/catalog/PartitionStatsUtil.java
@@ -134,7 +134,7 @@ public class PartitionStatsUtil {
    * partition's stats are removed.
    */
   public static void partStatsToPartition(TPartitionStats partStats,
-      HdfsPartition partition) throws ImpalaException {
+      HdfsPartition.Builder partition) throws ImpalaException {
     if (partStats == null) {
       partition.setPartitionStatsBytes(null, false);
       return;
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 2071f75..c1bb27f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -798,4 +798,19 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
   public ValidWriteIdList getValidWriteIds() {
     return null;
   }
+
+  /**
+   * When altering a table we modify the cached metadata and apply the new metadata to
+   * HMS. Some DDL/DMLs require reloading the HMS and file metadata to update the cached
+   * metadata (See CatalogOpExecutor#alterTable(TAlterTableParams, TDdlExecResponse) for
+   * more details). Before reloading the metadata, these modifications are not finalized.
+   * @return true if there are any in-progress modifications need to be finalized in an
+   * incremental table reload.
+   */
+  public boolean hasInProgressModification() { return false; }
+
+  /**
+   * Clears the in-progress modifications in case of failures.
+   */
+  public void resetInProgressModification() { }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index b2d6014..854d1d9 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -846,6 +846,8 @@ public class CatalogOpExecutor {
               "Unknown ALTER TABLE operation type: " + params.getAlter_type());
       }
 
+      // Make sure we won't forget finalizing the modification.
+      if (tbl.hasInProgressModification()) Preconditions.checkState(reloadMetadata);
       if (reloadMetadata) {
         loadTableMetadata(tbl, newCatalogVersion, reloadFileMetadata,
             reloadTableSchema, null, "ALTER TABLE " + params.getAlter_type().name());
@@ -854,9 +856,13 @@ public class CatalogOpExecutor {
         catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
         addTableToCatalogUpdate(tbl, response.result);
       }
+      // Make sure all the modifications are done.
+      Preconditions.checkState(!tbl.hasInProgressModification());
     } finally {
       context.stop();
       UnlockWriteLockIfErronouslyLocked();
+      // Clear in-progress modifications in case of exceptions.
+      tbl.resetInProgressModification();
       tbl.getLock().unlock();
     }
   }
@@ -1126,7 +1132,7 @@ public class CatalogOpExecutor {
 
     // Update partition-level row counts and incremental column stats for
     // partitioned Hdfs tables.
-    List<HdfsPartition> modifiedParts = null;
+    List<HdfsPartition.Builder> modifiedParts = null;
     if (params.isSetPartition_stats() && table.getNumClusteringCols() > 0) {
       Preconditions.checkState(table instanceof HdfsTable);
       modifiedParts = updatePartitionStats(params, (HdfsTable) table);
@@ -1159,10 +1165,10 @@ public class CatalogOpExecutor {
    * Row counts for missing or new partitions as a result of concurrent table alterations
    * are set to 0.
    */
-  private List<HdfsPartition> updatePartitionStats(TAlterTableUpdateStatsParams params,
-      HdfsTable table) throws ImpalaException {
+  private List<HdfsPartition.Builder> updatePartitionStats(
+      TAlterTableUpdateStatsParams params, HdfsTable table) throws ImpalaException {
     Preconditions.checkState(params.isSetPartition_stats());
-    List<HdfsPartition> modifiedParts = Lists.newArrayList();
+    List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
     // TODO(todd) only load the partitions that were modified in 'params'.
     Collection<? extends FeFsPartition> parts =
         FeCatalogUtils.loadAllPartitions(table);
@@ -1199,12 +1205,13 @@ public class CatalogOpExecutor {
         LOG.trace(String.format("Updating stats for partition %s: numRows=%d",
             partition.getValuesAsString(), numRows));
       }
-      PartitionStatsUtil.partStatsToPartition(partitionStats, partition);
-      partition.putToParameters(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
+      HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
+      PartitionStatsUtil.partStatsToPartition(partitionStats, partBuilder);
+      partBuilder.putToParameters(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
       // HMS requires this param for stats changes to take effect.
-      partition.putToParameters(MetastoreShim.statsGeneratedViaStatsTaskParam());
-      partition.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
-      modifiedParts.add(partition);
+      partBuilder.putToParameters(MetastoreShim.statsGeneratedViaStatsTaskParam());
+      partBuilder.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
+      modifiedParts.add(partBuilder);
     }
     return modifiedParts;
   }
@@ -1520,18 +1527,17 @@ public class CatalogOpExecutor {
           return;
         }
 
-        for(HdfsPartition partition : partitions) {
+        for (HdfsPartition partition : partitions) {
           if (partition.getPartitionStatsCompressed() != null) {
-            partition.dropPartitionStats();
-            try {
-              applyAlterPartition(table, partition);
-            } finally {
-              partition.markDirty();
-            }
+            HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
+            partBuilder.dropPartitionStats();
+            applyAlterPartition(table, partBuilder);
+            hdfsTbl.updatePartition(partBuilder);
           }
         }
       }
-      loadTableMetadata(table, newCatalogVersion, false, true, null, "DROP STATS");
+      loadTableMetadata(table, newCatalogVersion, /*reloadFileMetadata=*/false,
+          /*reloadTableSchema=*/true, /*partitionsToUpdate=*/null, "DROP STATS");
       addTableToCatalogUpdate(table, resp.result);
       addSummary(resp, "Stats have been dropped.");
     } finally {
@@ -1602,24 +1608,27 @@ public class CatalogOpExecutor {
     Preconditions.checkNotNull(hdfsTable);
 
     // List of partitions that were modified as part of this operation.
-    List<HdfsPartition> modifiedParts = Lists.newArrayList();
+    List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
     Collection<? extends FeFsPartition> parts =
         FeCatalogUtils.loadAllPartitions(hdfsTable);
     for (FeFsPartition fePart: parts) {
       // TODO(todd): avoid downcast
       HdfsPartition part = (HdfsPartition) fePart;
-      boolean isModified = false;
+      HdfsPartition.Builder partBuilder = null;
       if (part.getPartitionStatsCompressed() != null) {
-        part.dropPartitionStats();
-        isModified = true;
+        partBuilder = new HdfsPartition.Builder(part)
+            .dropPartitionStats();
       }
 
       // Remove the ROW_COUNT parameter if it has been set.
-      if (part.getParameters().remove(StatsSetupConst.ROW_COUNT) != null) {
-        isModified = true;
+      if (part.getParameters().containsKey(StatsSetupConst.ROW_COUNT)) {
+        if (partBuilder == null) {
+          partBuilder = new HdfsPartition.Builder(part);
+        }
+        partBuilder.getParameters().remove(StatsSetupConst.ROW_COUNT);
       }
 
-      if (isModified) modifiedParts.add(part);
+      if (partBuilder != null) modifiedParts.add(partBuilder);
     }
 
     bulkAlterPartitions(table, modifiedParts, null);
@@ -1944,8 +1953,12 @@ public class CatalogOpExecutor {
           FeCatalogUtils.loadAllPartitions(hdfsTable);
       for (FeFsPartition part: parts) {
         if (part.isMarkedCached()) {
+          HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(
+              (HdfsPartition) part);
           try {
-            HdfsCachingUtil.removePartitionCacheDirective(part);
+            HdfsCachingUtil.removePartitionCacheDirective(partBuilder);
+            // We are dropping the table. Don't need to update the existing partition so
+            // ignore the partBuilder here.
           } catch (Exception e) {
             LOG.error("Unable to uncache partition: " + part.getPartitionName(), e);
           }
@@ -3153,10 +3166,11 @@ public class CatalogOpExecutor {
       Preconditions.checkArgument(tbl instanceof HdfsTable);
       List<HdfsPartition> partitions =
           ((HdfsTable) tbl).getPartitionsFromPartitionSet(partitionSet);
-      List<HdfsPartition> modifiedParts = Lists.newArrayList();
+      List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
       for(HdfsPartition partition: partitions) {
-        partition.setFileFormat(HdfsFileFormat.fromThrift(fileFormat));
-        modifiedParts.add(partition);
+        modifiedParts.add(
+            new HdfsPartition.Builder(partition)
+                .setFileFormat(HdfsFileFormat.fromThrift(fileFormat)));
       }
       bulkAlterPartitions(tbl, modifiedParts, null);
       numUpdatedPartitions.setRef((long) modifiedParts.size());
@@ -3190,10 +3204,11 @@ public class CatalogOpExecutor {
     } else {
       List<HdfsPartition> partitions =
           ((HdfsTable) tbl).getPartitionsFromPartitionSet(partitionSet);
-      List<HdfsPartition> modifiedParts = Lists.newArrayList();
+      List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
       for(HdfsPartition partition: partitions) {
-        HiveStorageDescriptorFactory.setSerdeInfo(rowFormat, partition.getSerdeInfo());
-        modifiedParts.add(partition);
+        HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
+        HiveStorageDescriptorFactory.setSerdeInfo(rowFormat, partBuilder.getSerdeInfo());
+        modifiedParts.add(partBuilder);
       }
       bulkAlterPartitions(tbl, modifiedParts, null);
       numUpdatedPartitions.setRef((long) modifiedParts.size());
@@ -3231,11 +3246,12 @@ public class CatalogOpExecutor {
       TableName tableName = tbl.getTableName();
       HdfsPartition partition = catalog_.getHdfsPartition(
           tableName.getDb(), tableName.getTbl(), partitionSpec);
-      partition.setLocation(location);
+      HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
+      partBuilder.setLocation(location);
       try {
-        applyAlterPartition(tbl, partition);
+        applyAlterPartition(tbl, partBuilder);
       } finally {
-        partition.markDirty();
+        ((HdfsTable) tbl).markDirtyPartition(partBuilder);
       }
     }
     return reloadFileMetadata;
@@ -3256,26 +3272,27 @@ public class CatalogOpExecutor {
       List<HdfsPartition> partitions =
           ((HdfsTable) tbl).getPartitionsFromPartitionSet(params.getPartition_set());
 
-      List<HdfsPartition> modifiedParts = Lists.newArrayList();
+      List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
       for(HdfsPartition partition: partitions) {
+        HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
         switch (params.getTarget()) {
           case TBL_PROPERTY:
-            partition.getParameters().putAll(properties);
+            partBuilder.getParameters().putAll(properties);
             break;
           case SERDE_PROPERTY:
-            partition.getSerdeInfo().getParameters().putAll(properties);
+            partBuilder.getSerdeInfo().getParameters().putAll(properties);
             break;
           default:
             throw new UnsupportedOperationException(
                 "Unknown target TTablePropertyType: " + params.getTarget());
         }
-        modifiedParts.add(partition);
+        modifiedParts.add(partBuilder);
       }
       try {
         bulkAlterPartitions(tbl, modifiedParts, null);
       } finally {
-        for (HdfsPartition modifiedPart : modifiedParts) {
-          modifiedPart.markDirty();
+        for (HdfsPartition.Builder modifiedPart : modifiedParts) {
+          ((HdfsTable) tbl).markDirtyPartition(modifiedPart);
         }
       }
       numUpdatedPartitions.setRef((long) modifiedParts.size());
@@ -3373,17 +3390,18 @@ public class CatalogOpExecutor {
           // needs to be updated
           if (!partition.isMarkedCached() ||
               HdfsCachingUtil.isUpdateOp(cacheOp, partition.getParameters())) {
+            HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
             try {
               // If the partition was already cached, update the directive otherwise
               // issue new cache directive
               if (!partition.isMarkedCached()) {
                 cacheDirIds.add(HdfsCachingUtil.submitCachePartitionDirective(
-                    partition, cacheOp.getCache_pool_name(), cacheReplication));
+                    partBuilder, cacheOp.getCache_pool_name(), cacheReplication));
               } else {
                 Long directiveId = HdfsCachingUtil.getCacheDirectiveId(
                     partition.getParameters());
                 cacheDirIds.add(HdfsCachingUtil.modifyCacheDirective(directiveId,
-                    partition, cacheOp.getCache_pool_name(), cacheReplication));
+                    partBuilder, cacheOp.getCache_pool_name(), cacheReplication));
               }
             } catch (ImpalaRuntimeException e) {
               if (partition.isMarkedCached()) {
@@ -3397,9 +3415,9 @@ public class CatalogOpExecutor {
 
             // Update the partition metadata.
             try {
-              applyAlterPartition(tbl, partition);
+              applyAlterPartition(tbl, partBuilder);
             } finally {
-              partition.markDirty();
+              ((HdfsTable) tbl).markDirtyPartition(partBuilder);
             }
           }
         }
@@ -3425,11 +3443,12 @@ public class CatalogOpExecutor {
           // TODO(todd): avoid downcast
           HdfsPartition partition = (HdfsPartition) fePartition;
           if (partition.isMarkedCached()) {
-            HdfsCachingUtil.removePartitionCacheDirective(partition);
+            HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
+            HdfsCachingUtil.removePartitionCacheDirective(partBuilder);
             try {
-              applyAlterPartition(tbl, partition);
+              applyAlterPartition(tbl, partBuilder);
             } finally {
-              partition.markDirty();
+              ((HdfsTable) tbl).markDirtyPartition(partBuilder);
             }
           }
         }
@@ -3461,23 +3480,25 @@ public class CatalogOpExecutor {
     Preconditions.checkArgument(tbl instanceof HdfsTable);
     List<HdfsPartition> partitions =
         ((HdfsTable) tbl).getPartitionsFromPartitionSet(params.getPartition_set());
-    List<HdfsPartition> modifiedParts = Lists.newArrayList();
+    List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
     if (cacheOp.isSet_cached()) {
       for (HdfsPartition partition : partitions) {
         // The directive is null if the partition is not cached
         Long directiveId =
             HdfsCachingUtil.getCacheDirectiveId(partition.getParameters());
+        HdfsPartition.Builder partBuilder = null;
         short replication = HdfsCachingUtil.getReplicationOrDefault(cacheOp);
         List<Long> cacheDirs = Lists.newArrayList();
         if (directiveId == null) {
+          partBuilder = new HdfsPartition.Builder(partition);
           cacheDirs.add(HdfsCachingUtil.submitCachePartitionDirective(
-              partition, cacheOp.getCache_pool_name(), replication));
+              partBuilder, cacheOp.getCache_pool_name(), replication));
         } else {
           if (HdfsCachingUtil.isUpdateOp(cacheOp, partition.getParameters())) {
+            partBuilder = new HdfsPartition.Builder(partition);
             HdfsCachingUtil.validateCachePool(cacheOp, directiveId, tableName, partition);
             cacheDirs.add(HdfsCachingUtil.modifyCacheDirective(
-                directiveId, partition, cacheOp.getCache_pool_name(),
-                replication));
+                directiveId, partBuilder, cacheOp.getCache_pool_name(), replication));
           }
         }
 
@@ -3487,23 +3508,22 @@ public class CatalogOpExecutor {
           catalog_.watchCacheDirs(cacheDirs, tableName.toThrift(),
               "ALTER PARTITION SET CACHED");
         }
-        if (!partition.isMarkedCached()) {
-          modifiedParts.add(partition);
-        }
+        if (partBuilder != null) modifiedParts.add(partBuilder);
       }
     } else {
       for (HdfsPartition partition : partitions) {
         if (partition.isMarkedCached()) {
-          HdfsCachingUtil.removePartitionCacheDirective(partition);
-          modifiedParts.add(partition);
+          HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
+          HdfsCachingUtil.removePartitionCacheDirective(partBuilder);
+          modifiedParts.add(partBuilder);
         }
       }
     }
     try {
       bulkAlterPartitions(tbl, modifiedParts, null);
     } finally {
-      for (HdfsPartition modifiedPart : modifiedParts) {
-        modifiedPart.markDirty();
+      for (HdfsPartition.Builder modifiedPart : modifiedParts) {
+        ((HdfsTable) tbl).markDirtyPartition(modifiedPart);
       }
     }
     numUpdatedPartitions.setRef((long) modifiedParts.size());
@@ -3668,7 +3688,7 @@ public class CatalogOpExecutor {
    * processor to skip the event generated on the partition.
    */
   private void addToInflightVersionsOfPartition(
-      Map<String, String> partitionParams, HdfsPartition hdfsPartition) {
+      Map<String, String> partitionParams, HdfsPartition.Builder partBuilder) {
     if (!catalog_.isEventProcessingActive()) return;
     Preconditions.checkState(partitionParams != null);
     String version = partitionParams
@@ -3680,7 +3700,7 @@ public class CatalogOpExecutor {
     // catalog service identifiers
     if (catalog_.getCatalogServiceId().equals(serviceId)) {
       Preconditions.checkNotNull(version);
-      hdfsPartition.addToVersionsForInflightEvents(false, Long.parseLong(version));
+      partBuilder.addToVersionsForInflightEvents(false, Long.parseLong(version));
     }
   }
 
@@ -3795,14 +3815,14 @@ public class CatalogOpExecutor {
     }
   }
 
-  private void applyAlterPartition(Table tbl, HdfsPartition partition)
+  private void applyAlterPartition(Table tbl, HdfsPartition.Builder partBuilder)
       throws ImpalaException {
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      Partition hmsPartition = partition.toHmsPartition();
+      Partition hmsPartition = partBuilder.toHmsPartition();
       addCatalogServiceIdentifiers(tbl.getMetaStoreTable(), hmsPartition);
       applyAlterHmsPartitions(tbl.getMetaStoreTable().deepCopy(), msClient,
           tbl.getTableName(), Arrays.asList(hmsPartition));
-      addToInflightVersionsOfPartition(hmsPartition.getParameters(), partition);
+      addToInflightVersionsOfPartition(hmsPartition.getParameters(), partBuilder);
     }
   }
 
@@ -3939,51 +3959,48 @@ public class CatalogOpExecutor {
    * reduces the time spent in a single update and helps avoid metastore client
    * timeouts.
    */
-  private void bulkAlterPartitions(Table tbl, List<HdfsPartition> modifiedParts,
+  private void bulkAlterPartitions(Table tbl, List<HdfsPartition.Builder> modifiedParts,
       TblTransaction tblTxn) throws ImpalaException {
-    List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitions =
-        Lists.newArrayList();
-    for (HdfsPartition p: modifiedParts) {
-      org.apache.hadoop.hive.metastore.api.Partition msPart = p.toHmsPartition();
+    // Map from msPartitions to the partition builders. Use IdentityHashMap since
+    // modifications will change hash codes of msPartitions.
+    Map<Partition, HdfsPartition.Builder> msPartitionToBuilders =
+        Maps.newIdentityHashMap();
+    for (HdfsPartition.Builder p: modifiedParts) {
+      Partition msPart = p.toHmsPartition();
       if (msPart != null) {
         addCatalogServiceIdentifiers(tbl.getMetaStoreTable(), msPart);
-        hmsPartitions.add(msPart);
+        msPartitionToBuilders.put(msPart, p);
       }
     }
-    if (hmsPartitions.isEmpty()) return;
+    if (msPartitionToBuilders.isEmpty()) return;
 
     String dbName = tbl.getDb().getName();
     String tableName = tbl.getName();
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       // Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'.
-      for (List<Partition> hmsPartitionsSubList :
-        Lists.partition(hmsPartitions, MAX_PARTITION_UPDATES_PER_RPC)) {
+      for (List<Partition> msPartitionsSubList : Iterables.partition(
+          msPartitionToBuilders.keySet(), MAX_PARTITION_UPDATES_PER_RPC)) {
         try {
           // Alter partitions in bulk.
           if (tblTxn != null) {
             MetastoreShim.alterPartitionsWithTransaction(msClient.getHiveClient(), dbName,
-                tableName, hmsPartitionsSubList, tblTxn);
-          }
-          else {
+                tableName, msPartitionsSubList, tblTxn);
+          } else {
             MetastoreShim.alterPartitions(msClient.getHiveClient(), dbName, tableName,
-                hmsPartitionsSubList);
+                msPartitionsSubList);
           }
           // Mark the corresponding HdfsPartition objects as dirty
-          for (org.apache.hadoop.hive.metastore.api.Partition msPartition:
-              hmsPartitionsSubList) {
-            try {
-              HdfsPartition hdfsPartition = catalog_.getHdfsPartition(dbName, tableName,
-                  msPartition);
-              hdfsPartition.markDirty();
-              // if event processing is turned on add the version number from partition
-              // paramters to the HdfsPartition's list of in-flight events
-              addToInflightVersionsOfPartition(msPartition.getParameters(),
-                  hdfsPartition);
-            } catch (PartitionNotFoundException e) {
-              LOG.error(String.format("Partition of table %s could not be found: %s",
-                  tableName, e.getMessage()));
-              continue;
-            }
+          for (Partition msPartition: msPartitionsSubList) {
+            HdfsPartition.Builder partBuilder = msPartitionToBuilders.get(msPartition);
+            Preconditions.checkNotNull(partBuilder);
+            // TODO(IMPALA-9779): Should we always mark this as dirty? It will trigger
+            //  file meta reload for this partition. Consider remove this and mark the
+            //  "dirty" flag in callers. For those don't need to reload file meta, the
+            //  caller can build and replace the partition directly.
+            ((HdfsTable) tbl).markDirtyPartition(partBuilder);
+            // If event processing is turned on add the version number from partition
+            // parameters to the HdfsPartition's list of in-flight events.
+            addToInflightVersionsOfPartition(msPartition.getParameters(), partBuilder);
           }
         } catch (TException e) {
           throw new ImpalaRuntimeException(
@@ -4352,7 +4369,7 @@ public class CatalogOpExecutor {
                   for (org.apache.hadoop.hive.metastore.api.Partition part:
                       cachedHmsParts) {
                     try {
-                      HdfsCachingUtil.removePartitionCacheDirective(part);
+                      HdfsCachingUtil.removePartitionCacheDirective(part.getParameters());
                     } catch (ImpalaException e1) {
                       String msg = String.format(
                           "Partition %s.%s(%s): State: Leaked caching directive. " +
@@ -4383,7 +4400,6 @@ public class CatalogOpExecutor {
         // For non-partitioned table, only single part exists
         FeFsPartition singlePart = Iterables.getOnlyElement((List<FeFsPartition>) parts);
         affectedExistingPartitions.add(singlePart);
-
       }
       unsetTableColStats(table.getMetaStoreTable(), tblTxn);
       // Submit the watch request for the given cache directives.
diff --git a/fe/src/main/java/org/apache/impala/util/HdfsCachingUtil.java b/fe/src/main/java/org/apache/impala/util/HdfsCachingUtil.java
index b3f8bd7..1a0c1d9 100644
--- a/fe/src/main/java/org/apache/impala/util/HdfsCachingUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/HdfsCachingUtil.java
@@ -100,7 +100,7 @@ public class HdfsCachingUtil {
    * Returns the ID of the submitted cache directive and throws if there is an error
    * submitting the directive.
    */
-  public static long submitCachePartitionDirective(HdfsPartition part,
+  public static long submitCachePartitionDirective(HdfsPartition.Builder part,
       String poolName, short replication) throws ImpalaRuntimeException {
     long id = HdfsCachingUtil.submitDirective(new Path(part.getLocation()),
         poolName, replication);
@@ -154,7 +154,7 @@ public class HdfsCachingUtil {
    * data. Also updates the partition's metadata to remove the cache directive ID.
    * No-op if the table is not cached.
    */
-  public static void removePartitionCacheDirective(FeFsPartition part)
+  public static void removePartitionCacheDirective(HdfsPartition.Builder part)
       throws ImpalaException {
     Preconditions.checkNotNull(part);
     Map<String, String> parameters = part.getParameters();
@@ -174,17 +174,16 @@ public class HdfsCachingUtil {
   }
 
   /**
-   * Convenience method for working directly on a metastore partition. See
-   * removePartitionCacheDirective(HdfsPartition) for more details.
+   * Convenience method for working directly on a metastore partition params map. See
+   * removePartitionCacheDirective(HdfsPartition.Builder) for more details.
    */
   public static void removePartitionCacheDirective(
-      org.apache.hadoop.hive.metastore.api.Partition part) throws ImpalaException {
-    Preconditions.checkNotNull(part);
-    Long id = getCacheDirectiveId(part.getParameters());
+      Map<String, String> partitionParams) throws ImpalaException {
+    Long id = getCacheDirectiveId(partitionParams);
     if (id == null) return;
     HdfsCachingUtil.removeDirective(id);
-    part.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
-    part.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
+    partitionParams.remove(CACHE_DIR_ID_PROP_NAME);
+    partitionParams.remove(CACHE_DIR_REPLICATION_PROP_NAME);
   }
 
   /**
@@ -348,8 +347,8 @@ public class HdfsCachingUtil {
    * Update cache directive for a partition and update the metastore parameters.
    * Returns the cache directive ID
    */
-  public static long modifyCacheDirective(Long id, HdfsPartition part, String poolName,
-      short replication) throws ImpalaRuntimeException {
+  public static long modifyCacheDirective(Long id, HdfsPartition.Builder part,
+      String poolName, short replication) throws ImpalaRuntimeException {
     Preconditions.checkNotNull(id);
     HdfsCachingUtil.modifyCacheDirective(id, new Path(part.getLocation()),
         poolName, replication);
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
index 838983d..4138c2f 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
@@ -236,14 +236,14 @@ public class CatalogObjectToFromThriftTest {
     Assert.assertNotNull(part);;
     // Create a dummy partition with an invalid decimal type.
     try {
-      new HdfsPartition(hdfsTable, part.toHmsPartition(),
-          Lists.newArrayList(
+      new HdfsPartition.Builder(hdfsTable)
+          .setMsPartition(part.toHmsPartition())
+          .setPartitionKeyValues(Lists.newArrayList(
               LiteralExpr.createFromUnescapedStr(
                   "11.1", ScalarType.createDecimalType(1, 0)),
               LiteralExpr.createFromUnescapedStr(
-                  "11.1", ScalarType.createDecimalType(1, 0))),
-          null, new ArrayList<>(),
-          TAccessLevel.READ_WRITE);
+                  "11.1", ScalarType.createDecimalType(1, 0))))
+          .setAccessLevel(TAccessLevel.READ_WRITE);
       fail("Expected metadata to be malformed.");
     } catch (SqlCastException e) {
       Assert.assertTrue(e.getMessage().contains(
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index a73e9a1..9a2ca3b 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -421,9 +421,10 @@ public class CatalogTest {
     // reload path. Since we can't modify HDFS itself via these tests, we
     // do the next best thing: modify the metadata to revise history as
     // though the partition used above were actually empty.
-    HdfsPartition hdfsPartition = table
-        .getPartitionFromThriftPartitionSpec(partitionSpec);
-    hdfsPartition.setFileDescriptors(new ArrayList<>());
+    HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(
+        table.getPartitionFromThriftPartitionSpec(partitionSpec));
+    partBuilder.setFileDescriptors(new ArrayList<>());
+    table.updatePartition(partBuilder);
     stats.reset();
     catalog_.reloadPartition(table, partitionSpec, new Reference<>(false), "test");