You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bi...@apache.org on 2021/10/07 01:39:10 UTC

[impala] branch master updated (d2f866f -> cee2b42)

This is an automated email from the ASF dual-hosted git repository.

bikram pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from d2f866f  IMPALA-10935: Impala crashes on old Iceberg table property
     new d8d44f3  IMPALA-9857: Batching of consecutive partition events
     new 6196488  IMPALA-10914: Consistently schedule scan ranges for Iceberg tables
     new 5e3d439  IMPALA-10862 Optimization of the code structure of TmpDir
     new cee2b42  IMPALA-10942: Fix memory leak in admission controller

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/runtime/tmp-file-mgr-internal.h             | 114 ++++-
 be/src/runtime/tmp-file-mgr-test.cc                | 130 ++---
 be/src/runtime/tmp-file-mgr.cc                     | 521 ++++++++++++---------
 be/src/runtime/tmp-file-mgr.h                      |  50 +-
 be/src/scheduling/admission-control-service.cc     |   2 +-
 be/src/scheduling/admission-controller.cc          |   6 +-
 be/src/scheduling/admission-controller.h           |   2 +-
 .../scheduling/local-admission-control-client.cc   |   2 +-
 common/thrift/CatalogService.thrift                |  12 +
 .../impala/catalog/CatalogServiceCatalog.java      |  77 ++-
 .../org/apache/impala/catalog/FeIcebergTable.java  |  38 +-
 .../org/apache/impala/catalog/HdfsPartition.java   |   7 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  | 101 +++-
 .../org/apache/impala/catalog/IcebergTable.java    |  29 +-
 .../main/java/org/apache/impala/catalog/Table.java |   1 +
 .../impala/catalog/events/MetastoreEvents.java     | 477 ++++++++++++++++---
 .../catalog/events/MetastoreEventsProcessor.java   |   5 +-
 .../impala/catalog/events/SelfEventContext.java    |  38 +-
 .../impala/catalog/iceberg/IcebergCatalogs.java    |  25 +-
 .../impala/catalog/iceberg/IcebergCtasTarget.java  |  16 +-
 .../impala/catalog/iceberg/IcebergHiveCatalog.java |   1 +
 .../impala/catalog/local/CatalogdMetaProvider.java |  11 +
 .../impala/catalog/local/DirectMetaProvider.java   |   8 +
 .../impala/catalog/local/LocalIcebergTable.java    |  24 +-
 .../apache/impala/catalog/local/MetaProvider.java  |   6 +
 .../apache/impala/service/CatalogOpExecutor.java   | 231 ++++++---
 .../impala/service/IcebergCatalogOpExecutor.java   | 105 +++--
 .../java/org/apache/impala/util/IcebergUtil.java   |   8 +-
 .../events/MetastoreEventsProcessorTest.java       | 249 +++++++++-
 .../queries/QueryTest/iceberg-catalogs.test        |   2 -
 .../queries/QueryTest/show-create-table.test       |   2 +-
 tests/custom_cluster/test_events_custom_configs.py | 193 +++++++-
 tests/metadata/test_event_processing.py            |   1 +
 tests/metadata/test_show_create_table.py           |   4 +-
 tests/query_test/test_iceberg.py                   |  23 +
 tests/stress/test_insert_stress.py                 |  26 +-
 tests/util/event_processor_utils.py                |   9 +-
 37 files changed, 1866 insertions(+), 690 deletions(-)

[impala] 01/04: IMPALA-9857: Batching of consecutive partition events

Posted by bi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bikram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d8d44f3f147ce0f98bdd9e0387ae080010f55965
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Wed Sep 8 14:24:49 2021 -0700

    IMPALA-9857: Batching of consecutive partition events
    
    This patch improves the performance of events processor
    by batching together consecutive ALTER_PARTITION or
    INSERT events. Currently, without this patch, if
    the events stream consists of a lot of consecutive
    ALTER_PARTITION events which cannot be skipped,
    events processor will refresh partition from each
    event one by one. Similarly, in case of INSERT events
    in a partition events processor refresh one partition
    at a time.
    
    By batching together such consecutive ALTER_PARTITION or
    INSERT events, events processor needs to take lock on the table
    only once per batch and can refresh all the partitions from
    the events using multiple threads. For transactional (acid)
    tables, this provides even significant performance gain
    since currently we refresh the whole table in case of
    ALTER_PARTITION or INSERT partition events. By batching them
    together, events processor will refresh the table once per
    batch.
    
    The batch of eligible ALTER_PARTITION and INSERT events will
    be processed as ALTER_PARTITIONS and INSERT_PARTITIONS event
    respectively.
    
    Performance tests:
    In order to simulate bunch of ALTER_PARTITION and INSERT
    events, a simple test was performed by running the following
    query from hive:
    insert into store_sales_copy partition(ss_sold_date_sk)
    select * from store_sales;
    
    This query generates 1824 ALTER_PARTITION and 1824 INSERT
    events and time taken to process all the events generated
    was measured before and after the patch for external and
    ACID table.
    
    Table Type              Before          After
    ======================================================
    External table          75 sec          25 sec
    ACID tables             313 sec         47 sec
    
    Additionally, the patch also fixes a minor bug in
    evaluateSelfEvent() method which should return false when
    serviceId does not match.
    
    Testing Done:
    1. Added new tests which cover the batching logic of events.
    2. Exhaustive tests.
    
    Change-Id: I5d27a68a64436d31731e9a219b1efd6fc842de73
    Reviewed-on: http://gerrit.cloudera.org:8080/17848
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Sourabh Goyal <so...@cloudera.com>
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
---
 .../impala/catalog/CatalogServiceCatalog.java      |  72 ++--
 .../org/apache/impala/catalog/HdfsPartition.java   |   7 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  | 101 ++++-
 .../impala/catalog/events/MetastoreEvents.java     | 477 +++++++++++++++++----
 .../catalog/events/MetastoreEventsProcessor.java   |   5 +-
 .../impala/catalog/events/SelfEventContext.java    |  38 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  63 +--
 .../events/MetastoreEventsProcessorTest.java       | 249 ++++++++++-
 tests/custom_cluster/test_events_custom_configs.py | 135 ++++--
 tests/metadata/test_event_processing.py            |   1 +
 tests/util/event_processor_utils.py                |   9 +-
 11 files changed, 941 insertions(+), 216 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 5129134..34eea1c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -20,6 +20,7 @@ package org.apache.impala.catalog;
 import static org.apache.impala.thrift.TCatalogObjectType.HDFS_PARTITION;
 import static org.apache.impala.thrift.TCatalogObjectType.TABLE;
 
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -48,9 +49,9 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.authorization.AuthorizationDelta;
@@ -62,15 +63,12 @@ import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.events.ExternalEventsProcessor;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
-import org.apache.impala.catalog.events.NoOpEventProcessor;
 import org.apache.impala.catalog.events.SelfEventContext;
 import org.apache.impala.catalog.monitor.CatalogMonitor;
 import org.apache.impala.catalog.monitor.CatalogTableMetrics;
-import org.apache.impala.catalog.monitor.CatalogMonitor;
 import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
 import org.apache.impala.catalog.metastore.HmsApiNameEnum;
 import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
-import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
@@ -936,31 +934,32 @@ public class CatalogServiceCatalog extends Catalog {
    * evaluate if this is a self-event or not
    * @return true if given event information evaluates to a self-event, false otherwise
    */
-  public boolean evaluateSelfEvent(boolean isInsertEvent, SelfEventContext ctx)
-      throws CatalogException {
+  public boolean evaluateSelfEvent(SelfEventContext ctx) throws CatalogException {
     Preconditions.checkState(isEventProcessingActive(),
         "Event processing should be enabled when calling this method");
+    boolean isInsertEvent = ctx.isInsertEventContext();
     long versionNumber =
-        isInsertEvent ? ctx.getIdFromEvent() : ctx.getVersionNumberFromEvent();
+        isInsertEvent ? ctx.getInsertEventId(0) : ctx.getVersionNumberFromEvent();
     String serviceIdFromEvent = ctx.getServiceIdFromEvent();
 
     if (!isInsertEvent) {
       // no version info or service id in the event
       if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) {
-        LOG.info("Not a self-event since the given version is {} and service id is {}",
+        LOG.debug("Not a self-event since the given version is {} and service id is {}",
             versionNumber, serviceIdFromEvent.isEmpty() ? "empty" : serviceIdFromEvent);
         return false;
       }
       // if the service id from event doesn't match with our service id this is not a
       // self-event
       if (!getCatalogServiceId().equals(serviceIdFromEvent)) {
-        LOG.info("Not a self-event because service id of this catalog {} does not match "
+        LOG.debug("Not a self-event because service id of this catalog {} does not match "
                 + "with one in event {}.",
             getCatalogServiceId(), serviceIdFromEvent);
+        return false;
       }
     } else if (versionNumber == -1) {
       // if insert event, we only compare eventId
-      LOG.info("Not a self-event because eventId is {}", versionNumber);
+      LOG.debug("Not a self-event because eventId is {}", versionNumber);
       return false;
     }
     Db db = getDb(ctx.getDbName());
@@ -978,7 +977,7 @@ public class CatalogServiceCatalog extends Catalog {
       try {
         boolean removed = db.removeFromVersionsForInflightEvents(versionNumber);
         if (!removed) {
-          LOG.info("Could not find version {} in the in-flight event list of database "
+          LOG.debug("Could not find version {} in the in-flight event list of database "
               + "{}", versionNumber, db.getName());
         }
         return removed;
@@ -1005,27 +1004,30 @@ public class CatalogServiceCatalog extends Catalog {
         boolean removed =
             tbl.removeFromVersionsForInflightEvents(isInsertEvent, versionNumber);
         if (!removed) {
-          LOG.info("Could not find {} {} in in-flight event list of table {}",
+          LOG.debug("Could not find {} {} in in-flight event list of table {}",
               isInsertEvent ? "eventId" : "version", versionNumber, tbl.getFullName());
         }
         return removed;
       }
       if (tbl instanceof HdfsTable) {
         List<String> failingPartitions = new ArrayList<>();
-        for (List<TPartitionKeyValue> partitionKeyValue : partitionKeyValues) {
+        int len = partitionKeyValues.size();
+        for (int i=0; i<len; ++i) {
+          List<TPartitionKeyValue> partitionKeyValue = partitionKeyValues.get(i);
+          versionNumber = isInsertEvent ? ctx.getInsertEventId(i) : versionNumber;
           HdfsPartition hdfsPartition =
               ((HdfsTable) tbl).getPartitionFromThriftPartitionSpec(partitionKeyValue);
           if (hdfsPartition == null
-              || !hdfsPartition.removeFromVersionsForInflightEvents(
-                     isInsertEvent, versionNumber)) {
+              || !hdfsPartition.removeFromVersionsForInflightEvents(isInsertEvent,
+                  versionNumber)) {
             // even if this is an error condition we should not bail out early since we
             // should clean up the self-event state on the rest of the partitions
             String partName = HdfsTable.constructPartitionName(partitionKeyValue);
             if (hdfsPartition == null) {
-              LOG.info("Partition {} not found during self-event "
-                + "evaluation for the table {}", partName, tbl.getFullName());
+              LOG.debug("Partition {} not found during self-event "
+                  + "evaluation for the table {}", partName, tbl.getFullName());
             } else {
-              LOG.info("Could not find {} in in-flight event list of the partition {} "
+              LOG.trace("Could not find {} in in-flight event list of the partition {} "
                   + "of table {}", versionNumber, partName, tbl.getFullName());
             }
             failingPartitions.add(partName);
@@ -1045,8 +1047,8 @@ public class CatalogServiceCatalog extends Catalog {
    * @param isInsertEvent if false add versionNumber for DDL Event, otherwise add eventId
    * for Insert Event.
    * @param tbl Catalog table
-   * @param versionNumber when isInsertEvent is true, it is eventId to add
-   * when isInsertEvent is false, it is version number to add
+   * @param versionNumber when isInsertEventContext is true, it is eventId to add
+   * when isInsertEventContext is false, it is version number to add
    */
   public void addVersionsForInflightEvents(
       boolean isInsertEvent, Table tbl, long versionNumber) {
@@ -2568,30 +2570,6 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Refresh partition if it exists.
-   *
-   * @return true if partition was reloaded, else false.
-   * @throws CatalogException if partition reload threw an error.
-   * @throws DatabaseNotFoundException if Db doesn't exist.
-   * @throws TableNotFoundException if table doesn't exist.
-   * @throws TableNotLoadedException if table is not loaded in Catalog.
-   */
-  public boolean reloadPartitionIfExists(String dbName, String tblName,
-      List<TPartitionKeyValue> tPartSpec, String reason) throws CatalogException {
-    Table table = getTable(dbName, tblName);
-    if (table == null) {
-      throw new TableNotFoundException(dbName + "." + tblName + " not found");
-    }
-    if (table instanceof IncompleteTable) {
-      throw new TableNotLoadedException(dbName + "." + tblName + " is not loaded");
-    }
-    Reference<Boolean> wasPartitionRefreshed = new Reference<>(false);
-    reloadPartition(table, tPartSpec, wasPartitionRefreshed,
-        CatalogObject.ThriftObjectType.NONE, reason);
-    return wasPartitionRefreshed.getRef();
-  }
-
-  /**
    * Refresh table if exists. Returns true if reloadTable() succeeds, false
    * otherwise.
    */
@@ -2986,7 +2964,11 @@ public class CatalogServiceCatalog extends Catalog {
         throw new CatalogException("Error loading metadata for partition: "
             + hdfsTable.getFullName() + " " + partitionName, e);
       }
-      hdfsTable.reloadPartition(msClient.getHiveClient(), hdfsPartition, hmsPartition);
+      Map<Partition, HdfsPartition> hmsPartToHdfsPart = new HashMap<>();
+      // note that hdfsPartition can be null here which is a valid input argument
+      // in such a case a new hdfsPartition is added and nothing is removed.
+      hmsPartToHdfsPart.put(hmsPartition, hdfsPartition);
+      hdfsTable.reloadPartitions(msClient.getHiveClient(), hmsPartToHdfsPart);
     }
     hdfsTable.setCatalogVersion(newCatalogVersion);
     wasPartitionReloaded.setRef(true);
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 81920be..25f8aea 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -875,7 +875,8 @@ public class HdfsPartition extends CatalogObjectImpl
    * Insert events. If false, remove version number from list of versions for in-flight
    * DDL events.
    * @param versionNumber when isInsertEvent is true, it's eventId to remove
-   *                      when isInsertEvent is false, it's version number to remove
+   *                      when isInsertEvent is false, it's version number to
+   *                      remove.
    * @return true if the versionNumber was removed, false if it didn't exist
    */
   public boolean removeFromVersionsForInflightEvents(
@@ -885,8 +886,8 @@ public class HdfsPartition extends CatalogObjectImpl
             + "partition " + getPartitionName() + " of table " + table_.getFullName());
     boolean ret = inFlightEvents_.remove(isInsertEvent, versionNumber);
     if (!ret) {
-      LOG.debug("Remove of in-flight version number failed for {}: {}", versionNumber,
-          inFlightEvents_.print());
+      LOG.trace("Failed to remove in-flight version number {}: in-flight events: {}",
+          versionNumber, inFlightEvents_.print());
     }
     return ret;
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 4424149..8959344 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -2674,25 +2676,90 @@ public class HdfsTable extends Table implements FeFsTable {
   }
 
   /**
-   * Reloads the metadata of partition 'oldPartition' by removing
-   * it from the table and reconstructing it from the HMS partition object
-   * 'hmsPartition'. If old partition is null then nothing is removed and
-   * and partition constructed from 'hmsPartition' is simply added.
+   * Generates a debug string useful for logging purposes. It returns a string consisting
+   * of names from the given list until the limit and then appends the count of the
+   * remaining items.
    */
-  public void reloadPartition(IMetaStoreClient client, HdfsPartition oldPartition,
-      Partition hmsPartition) throws CatalogException {
-    HdfsPartition.Builder partBuilder = createPartitionBuilder(
-        hmsPartition.getSd(), hmsPartition, new FsPermissionCache());
-    Preconditions.checkArgument(oldPartition == null
-        || HdfsPartition.comparePartitionKeyValues(
-            oldPartition.getPartitionValues(), partBuilder.getPartitionValues()) == 0);
-    if (oldPartition != null) {
-      partBuilder.setFileDescriptors(oldPartition);
+  private static String generateDebugStr(List<String> items, int limit) {
+    String result = Joiner.on(',').join(Iterables.limit(items, limit));
+    if (items.size() > limit) {
+      result = String.format("%s... and %s others", result, items.size()-limit);
+    }
+    return result;
+  }
+
+  /**
+   * Reloads the HdfsPartitions which correspond to the given partNames. Returns the
+   * number of partitions which were reloaded.
+   */
+  public int reloadPartitionsFromNames(IMetaStoreClient client,
+      List<String> partNames, String reason) throws CatalogException {
+    Preconditions.checkState(partNames != null && !partNames.isEmpty());
+    LOG.info(String.format("Reloading partition metadata: %s %s (%s)",
+        getFullName(), generateDebugStr(partNames, 3), reason));
+    List<Partition> hmsPartitions;
+    Map<Partition, HdfsPartition> hmsPartToHdfsPart = new HashMap<>();
+    try {
+      hmsPartitions = client.getPartitionsByNames(getDb().getName(),
+          getName(), partNames);
+      for (Partition partition : hmsPartitions) {
+        List<LiteralExpr> partExprs = getTypeCompatiblePartValues(partition.getValues());
+        HdfsPartition hdfsPartition = getPartition(partExprs);
+        if (hdfsPartition != null) {
+          hmsPartToHdfsPart.put(partition, hdfsPartition);
+        }
+      }
+      reloadPartitions(client, hmsPartToHdfsPart);
+      return hmsPartToHdfsPart.size();
+    } catch (NoSuchObjectException e) {
+      // HMS throws a NoSuchObjectException if the table does not exist
+      // in HMS anymore. In case the partitions don't exist in HMS it does not include
+      // them in the result of getPartitionsByNames.
+      throw new TableLoadingException(
+          "Error when reloading partitions for table " + getFullName(), e);
+    } catch (TException | UnsupportedEncodingException e2) {
+      throw new CatalogException(
+          "Unexpected error while retrieving partitions for table " + getFullName(), e2);
+    }
+  }
+
+  /**
+   * Reloads the metadata of partitions given by a map of HMS Partitions to existing (old)
+   * HdfsPartitions.
+   * @param hmsPartsToHdfsParts The map of HMS partition object to the old HdfsPartition.
+   *                            Every key-value in this map represents the HdfsPartition
+   *                            which needs to be removed from the table and
+   *                            reconstructed from the HMS partition key. If the
+   *                            value for a given partition key is null then nothing is
+   *                            removed and a new HdfsPartition is simply added.
+   */
+  public void reloadPartitions(IMetaStoreClient client,
+      Map<Partition, HdfsPartition> hmsPartsToHdfsParts) throws CatalogException {
+    FsPermissionCache permissionCache = new FsPermissionCache();
+    Map<HdfsPartition.Builder, HdfsPartition> partBuilderToPartitions = new HashMap<>();
+    for (Map.Entry<Partition, HdfsPartition> entry : hmsPartsToHdfsParts.entrySet()) {
+      Partition hmsPartition = entry.getKey();
+      HdfsPartition oldPartition = entry.getValue();
+      HdfsPartition.Builder partBuilder = createPartitionBuilder(
+          hmsPartition.getSd(), hmsPartition, permissionCache);
+      Preconditions.checkArgument(oldPartition == null
+          || HdfsPartition.comparePartitionKeyValues(
+          oldPartition.getPartitionValues(), partBuilder.getPartitionValues()) == 0);
+      if (oldPartition != null) {
+        partBuilder.setFileDescriptors(oldPartition);
+      }
+      partBuilderToPartitions.put(partBuilder, oldPartition);
+    }
+    // load file metadata in parallel
+    loadFileMetadataForPartitions(client,
+        partBuilderToPartitions.keySet(),/*isRefresh=*/true);
+    for (Map.Entry<HdfsPartition.Builder, HdfsPartition> entry :
+        partBuilderToPartitions.entrySet()) {
+      if (entry.getValue() != null) {
+        dropPartition(entry.getValue(), false);
+      }
+      addPartition(entry.getKey().build());
     }
-    loadFileMetadataForPartitions(client, ImmutableList.of(partBuilder),
-        /*isRefresh=*/true);
-    dropPartition(oldPartition, false);
-    addPartition(partBuilder.build());
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index f9e83f7..93d3af0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -102,8 +102,10 @@ public class MetastoreEvents {
     ALTER_DATABASE("ALTER_DATABASE"),
     ADD_PARTITION("ADD_PARTITION"),
     ALTER_PARTITION("ALTER_PARTITION"),
+    ALTER_PARTITIONS("ALTER_PARTITIONS"),
     DROP_PARTITION("DROP_PARTITION"),
     INSERT("INSERT"),
+    INSERT_PARTITIONS("INSERT_PARTITIONS"),
     OTHER("OTHER");
 
     private final String eventType_;
@@ -252,9 +254,52 @@ public class MetastoreEvents {
           + "filtered out: %d", sizeBefore, numFilteredEvents));
       metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
               .inc(numFilteredEvents);
-      LOG.info("Incremented skipped metric to " + metrics_
+      LOG.debug("Incremented skipped metric to " + metrics_
           .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
-      return metastoreEvents;
+      return createBatchEvents(metastoreEvents);
+    }
+
+    /**
+     * This method batches together any eligible consecutive elements from the given
+     * list of {@code MetastoreEvent}. The returned list may or may not contain batch
+     * events depending on whether it finds any events which could be batched together.
+     */
+    @VisibleForTesting
+    List<MetastoreEvent> createBatchEvents(List<MetastoreEvent> events) {
+      if (events.size() < 2) return events;
+      int i = 0, j = 1;
+      List<MetastoreEvent> batchedEventList = new ArrayList<>();
+      MetastoreEvent current = events.get(i);
+      // startEventId points to the current event's or the start of the batch
+      // in case current is a batch event.
+      long startEventId = current.getEventId();
+      while (j < events.size()) {
+        MetastoreEvent next = events.get(j);
+        // check if the current metastore event and the next can be batched together
+        if (!current.canBeBatched(next)) {
+          // events cannot be batched, add the current event under consideration to the
+          // list and update current to the next
+          if (current.getNumberOfEvents() > 1) {
+            current.infoLog("Created a batch event for {} events from {} to {}",
+                current.getNumberOfEvents(), startEventId, current.getEventId());
+            metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_BATCH_EVENTS).inc();
+          }
+          batchedEventList.add(current);
+          current = next;
+          startEventId = next.getEventId();
+        } else {
+          // next can be batched into current event
+          current = Preconditions.checkNotNull(current.addToBatchEvents(next));
+        }
+        j++;
+      }
+      if (current.getNumberOfEvents() > 1) {
+        current.infoLog("Created a batch event for {} events from {} to {}",
+            current.getNumberOfEvents(), startEventId, current.getEventId());
+        metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_BATCH_EVENTS).inc();
+      }
+      batchedEventList.add(current);
+      return batchedEventList;
     }
   }
 
@@ -293,10 +338,11 @@ public class MetastoreEvents {
     protected final String tblName_;
 
     // eventId of the event. Used instead of calling getter on event_ everytime
-    protected long eventId_;
+    private long eventId_;
 
-    // eventType from the NotificationEvent
-    protected final MetastoreEventType eventType_;
+    // eventType from the NotificationEvent or in case of batch events set using
+    // the setter for this field
+    private MetastoreEventType eventType_;
 
     // Actual notificationEvent object received from Metastore
     protected final NotificationEvent metastoreNotificationEvent_;
@@ -321,6 +367,17 @@ public class MetastoreEvents {
 
     public long getEventId() { return eventId_; }
 
+    public MetastoreEventType getEventType() { return eventType_; }
+
+    /**
+     * Certain events like {@link BatchPartitionEvent} batch a group of events
+     * to create a batch event type. This method is used to override the event type
+     * in such cases since the event type is not really derived from NotificationEvent.
+     */
+    public void setEventType(MetastoreEventType type) {
+      this.eventType_ = type;
+    }
+
     public String getCatalogName() { return catalogName_; }
 
     public String getDbName() { return dbName_; }
@@ -336,9 +393,9 @@ public class MetastoreEvents {
     public void processIfEnabled()
         throws CatalogException, MetastoreNotificationException {
       if (isEventProcessingDisabled()) {
-        LOG.info(debugString("Skipping this event because of flag evaluation"));
+        infoLog("Skipping this event because of flag evaluation");
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-        infoLog("Incremented skipped metric to " + metrics_
+        debugLog("Incremented skipped metric to " + metrics_
             .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
         return;
       }
@@ -346,6 +403,45 @@ public class MetastoreEvents {
     }
 
     /**
+     * Checks if the given event can be batched into this event. Default behavior is
+     * to return false which can be overridden by a sub-class.
+     *
+     * @param event The event under consideration to be batched into this event.
+     * @return false if event cannot be batched into this event; otherwise true.
+     */
+    protected boolean canBeBatched(MetastoreEvent event) { return false; }
+
+    /**
+     * Adds the given event into the batch of events represented by this event. Default
+     * implementation is to return null. Sub-classes must override this method to
+     * implement batching.
+     *
+     * @param event The event which needs to be added to the batch.
+     * @return The batch event which represents all the events batched into this event
+     * until now including the given event.
+     */
+    protected MetastoreEvent addToBatchEvents(MetastoreEvent event) { return null; }
+
+    /**
+     * Returns the number of events represented by this event. For most events this is
+     * 1. In case of batch events this could be more than 1.
+     */
+    protected int getNumberOfEvents() { return 1; }
+
+    /**
+     * Certain events like ALTER_TABLE or ALTER_PARTITION implement logic to ignore
+     * some events because they are not interesting from catalogd's perspective.
+     * @return true if this event can be skipped.
+     */
+    protected boolean canBeSkipped() { return false; }
+
+    /**
+     * In case of batch events, this method can be used override the {@code eventType_}
+     * field which is used for logging purposes.
+     */
+    protected MetastoreEventType getBatchEventType() { return null; }
+
+    /**
      * Process the information available in the NotificationEvent to take appropriate
      * action on Catalog
      *
@@ -376,8 +472,8 @@ public class MetastoreEvents {
      */
     private Object[] getLogFormatArgs(Object[] args) {
       Object[] formatArgs = new Object[args.length + 2];
-      formatArgs[0] = eventId_;
-      formatArgs[1] = eventType_;
+      formatArgs[0] = getEventId();
+      formatArgs[1] = getEventType();
       int i = 2;
       for (Object arg : args) {
         formatArgs[i] = arg;
@@ -412,6 +508,17 @@ public class MetastoreEvents {
     }
 
     /**
+     * Similar to infoLog excepts logs at trace level
+     */
+    protected void traceLog(String logFormattedStr, Object... args) {
+      if (!LOG.isTraceEnabled()) return;
+      String formatString =
+          new StringBuilder(LOG_FORMAT_EVENT_ID_TYPE).append(logFormattedStr).toString();
+      Object[] formatArgs = getLogFormatArgs(args);
+      LOG.trace(formatString, formatArgs);
+    }
+
+    /**
      * Search for a inverse event (for example drop_table is a inverse event for
      * create_table) for this event from a given list of notificationEvents starting for
      * the startIndex. This is useful for skipping certain events from processing
@@ -450,16 +557,15 @@ public class MetastoreEvents {
      * serviceId and version. More details on complete flow of self-event handling
      * logic can be read in <code>MetastoreEventsProcessor</code> documentation.
      *
-     * @param isInsertEvent if true, check in flight events list of Insert event
-     * if false, check events list of DDL
      * @return True if this event is a self-generated event. If the returned value is
      * true, this method also clears the version number from the catalog database/table.
      * Returns false if the version numbers or service id don't match
      */
-    protected boolean isSelfEvent(boolean isInsertEvent) {
+    protected boolean isSelfEvent() {
       try {
-        if (catalog_.evaluateSelfEvent(isInsertEvent, getSelfEventContext())) {
-          metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+        if (catalog_.evaluateSelfEvent(getSelfEventContext())) {
+          metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+              .inc(getNumberOfEvents());
           infoLog("Incremented events skipped counter to {}",
               metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
                   .getCount());
@@ -471,8 +577,6 @@ public class MetastoreEvents {
       }
       return false;
     }
-
-    protected boolean isSelfEvent() { return isSelfEvent(false); }
   }
 
   public static String getStringProperty(
@@ -492,18 +596,15 @@ public class MetastoreEvents {
     // case of alter events
     protected org.apache.hadoop.hive.metastore.api.Table msTbl_;
 
+    // in case of partition batch events, this method can be overridden to return
+    // the partition object from the events which are batched together.
+    protected Partition getPartitionForBatching() { return null; }
+
     private MetastoreTableEvent(CatalogOpExecutor catalogOpExecutor,
         Metrics metrics, NotificationEvent event) {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkNotNull(dbName_, debugString("Database name cannot be null"));
+      Preconditions.checkNotNull(dbName_, "Database name cannot be null");
       tblName_ = Preconditions.checkNotNull(event.getTableName());
-      if (MetastoreEventType.OTHER.equals(eventType_)) {
-        debugLog("Creating event {} of type {} ({}) on table {}", eventId_, eventType_,
-            event.getEventType(), getFullyQualifiedTblName());
-      } else {
-        debugLog("Creating event {} of type {} on table {}", eventId_, eventType_,
-            getFullyQualifiedTblName());
-      }
     }
 
 
@@ -618,23 +719,21 @@ public class MetastoreEvents {
     }
 
     /**
-     * Refreshes a partition provided by given spec only if the table is loaded
-     * @param partition the Partition object which needs to be reloaded.
-     * @param reason Event type which caused the refresh, used for logging by catalog
-     * @return false if the table or database did not exist or was not loaded, else
-     * returns true.
-     * @throws CatalogException
+     * Reloads the partitions provided, only if the table is loaded and if the partitions
+     * exist in catalogd.
+     * @param partitions the list of Partition objects which need to be reloaded.
+     * @param reason The reason for reload operation which is used for logging by
+     *               catalogd.
      */
-    protected boolean reloadPartition(Partition partition, String reason)
+    protected void reloadPartitions(List<Partition> partitions, String reason)
         throws CatalogException {
       try {
-        boolean result = catalogOpExecutor_
-            .reloadPartitionIfExists(eventId_, dbName_, tblName_, partition, reason);
-        if (result) {
+        int numPartsRefreshed = catalogOpExecutor_.reloadPartitionsIfExist(getEventId(),
+            dbName_, tblName_, partitions, reason);
+        if (numPartsRefreshed > 0) {
           metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES)
-              .inc();
+              .inc(numPartsRefreshed);
         }
-        return result;
       } catch (TableNotLoadedException e) {
         debugLog("Ignoring the event since table {} is not loaded",
             getFullyQualifiedTblName());
@@ -642,7 +741,6 @@ public class MetastoreEvents {
         debugLog("Ignoring the event since table {} is not found",
             getFullyQualifiedTblName());
       }
-      return false;
     }
   }
 
@@ -654,8 +752,8 @@ public class MetastoreEvents {
         NotificationEvent event) {
       super(catalogOpExecutor, metrics, event);
       Preconditions.checkNotNull(dbName_, debugString("Database name cannot be null"));
-      debugLog("Creating event {} of type {} on database {}", eventId_,
-              eventType_, dbName_);
+      debugLog("Creating event {} of type {} on database {}", getEventId(),
+              getEventType(), dbName_);
     }
 
     /**
@@ -685,7 +783,7 @@ public class MetastoreEvents {
     private CreateTableEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(eventType_));
+      Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(getEventType()));
       Preconditions
           .checkNotNull(event.getMessage(), debugString("Event message is null"));
       CreateTableMessage createTableMessage =
@@ -719,12 +817,12 @@ public class MetastoreEvents {
       // a self-event (see description of self-event in the class documentation of
       // MetastoreEventsProcessor)
       try {
-        if (catalogOpExecutor_.addTableIfNotRemovedLater(eventId_, msTbl_)) {
+        if (catalogOpExecutor_.addTableIfNotRemovedLater(getEventId(), msTbl_)) {
           infoLog("Successfully added table {}", getFullyQualifiedTblName());
           metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_ADDED).inc();
         } else {
           metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-          infoLog("Incremented skipped metric to " + metrics_
+          debugLog("Incremented skipped metric to " + metrics_
               .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
         }
       } catch (CatalogException e) {
@@ -745,7 +843,7 @@ public class MetastoreEvents {
           if (dbName_.equalsIgnoreCase(dropTableEvent.dbName_) && tblName_
               .equalsIgnoreCase(dropTableEvent.tblName_)) {
             infoLog("Found table {} is removed later in event {} type {}", tblName_,
-                dropTableEvent.eventId_, dropTableEvent.eventType_);
+                dropTableEvent.getEventId(), dropTableEvent.getEventType());
             return true;
           }
         } else if (event.eventType_.equals(MetastoreEventType.ALTER_TABLE)) {
@@ -762,7 +860,7 @@ public class MetastoreEvents {
               && dbName_.equalsIgnoreCase(alterTableEvent.msTbl_.getDbName())
               && tblName_.equalsIgnoreCase(alterTableEvent.msTbl_.getTableName())) {
             infoLog("Found table {} is renamed later in event {} type {}", tblName_,
-                alterTableEvent.eventId_, alterTableEvent.eventType_);
+                alterTableEvent.getEventId(), alterTableEvent.getEventType());
             return true;
           }
         }
@@ -791,7 +889,7 @@ public class MetastoreEvents {
     InsertEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.INSERT.equals(eventType_));
+      Preconditions.checkArgument(MetastoreEventType.INSERT.equals(getEventType()));
       InsertMessage insertMessage =
           MetastoreEventsProcessor.getMessageDeserializer()
               .getInsertMessage(event.getMessage());
@@ -805,23 +903,60 @@ public class MetastoreEvents {
     }
 
     @Override
+    protected MetastoreEventType getBatchEventType() {
+      return MetastoreEventType.INSERT_PARTITIONS;
+    }
+
+    @Override
+    protected Partition getPartitionForBatching() { return insertPartition_; }
+
+    @Override
+    public boolean canBeBatched(MetastoreEvent event) {
+      if (!(event instanceof InsertEvent)) return false;
+      InsertEvent insertEvent = (InsertEvent) event;
+      // batched events must have consecutive event ids
+      if (event.getEventId() != 1 + getEventId()) return false;
+      // make sure that the event is on the same table
+      if (!getFullyQualifiedTblName().equalsIgnoreCase(
+          insertEvent.getFullyQualifiedTblName())) {
+        return false;
+      }
+      // we currently only batch partition level insert events
+      if (this.insertPartition_ == null || insertEvent.insertPartition_ == null) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public MetastoreEvent addToBatchEvents(MetastoreEvent event) {
+      if (!(event instanceof InsertEvent)) return null;
+      BatchPartitionEvent<InsertEvent> batchEvent = new BatchPartitionEvent<>(
+          this);
+      Preconditions.checkState(batchEvent.canBeBatched(event));
+      batchEvent.addToBatchEvents(event);
+      return batchEvent;
+    }
+
+    @Override
     public SelfEventContext getSelfEventContext() {
       if (insertPartition_ != null) {
         // create selfEventContext for insert partition event
         List<TPartitionKeyValue> tPartSpec =
             getTPartitionSpecFromHmsPartition(msTbl_, insertPartition_);
         return new SelfEventContext(dbName_, tblName_, Arrays.asList(tPartSpec),
-            insertPartition_.getParameters(), eventId_);
+            insertPartition_.getParameters(), Arrays.asList(getEventId()));
       } else {
         // create selfEventContext for insert table event
         return new SelfEventContext(
-            dbName_, tblName_, null, msTbl_.getParameters(), eventId_);
+            dbName_, tblName_, null, msTbl_.getParameters(),
+            Arrays.asList(getEventId()));
       }
     }
 
     @Override
     public void process() throws MetastoreNotificationException {
-      if (isSelfEvent(true)) {
+      if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
         return;
       }
@@ -847,7 +982,7 @@ public class MetastoreEvents {
         // Ignore event if table or database is not in catalog. Throw exception if
         // refresh fails. If the partition does not exist in metastore the reload
         // method below removes it from the catalog
-        reloadPartition(insertPartition_, "INSERT");
+        reloadPartitions(Arrays.asList(insertPartition_), "INSERT event");
       } catch (CatalogException e) {
         throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
                 + "partition on table {} partition {} failed. Event processing cannot "
@@ -862,9 +997,9 @@ public class MetastoreEvents {
      */
     private void processTableInserts() throws MetastoreNotificationException {
       // For non-partitioned tables, refresh the whole table.
-      Preconditions.checkArgument(insertPartition_ == null);
+      Preconditions.checkState(insertPartition_ == null);
       try {
-        reloadTableFromCatalog("INSERT", false);
+        reloadTableFromCatalog("INSERT event", false);
       } catch (CatalogException e) {
         throw new MetastoreNotificationNeedsInvalidateException(
             debugString("Refresh table {} failed. Event processing "
@@ -897,7 +1032,7 @@ public class MetastoreEvents {
     AlterTableEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.ALTER_TABLE.equals(eventType_));
+      Preconditions.checkArgument(MetastoreEventType.ALTER_TABLE.equals(getEventType()));
       JSONAlterTableMessage alterTableMessage =
           (JSONAlterTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
               .getAlterTableMessage(event.getMessage());
@@ -937,7 +1072,7 @@ public class MetastoreEvents {
       Reference<Boolean> oldTblRemoved = new Reference<>();
       Reference<Boolean> newTblAdded = new Reference<>();
       catalogOpExecutor_
-          .renameTableFromEvent(eventId_, tableBefore_, tableAfter_, oldTblRemoved,
+          .renameTableFromEvent(getEventId(), tableBefore_, tableAfter_, oldTblRemoved,
               newTblAdded);
 
       if (oldTblRemoved.getRef()) {
@@ -948,7 +1083,7 @@ public class MetastoreEvents {
       }
       if (!oldTblRemoved.getRef() || !newTblAdded.getRef()) {
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-        infoLog("Incremented skipped metric to " + metrics_
+        debugLog("Incremented skipped metric to " + metrics_
             .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
       }
     }
@@ -1011,7 +1146,8 @@ public class MetastoreEvents {
       return false;
     }
 
-    private boolean canBeSkipped() {
+    @Override
+    protected boolean canBeSkipped() {
       // Certain alter events just modify some parameters such as
       // "transient_lastDdlTime" in Hive. For eg: the alter table event generated
       // along with insert events. Check if the alter table event is such a trivial
@@ -1070,7 +1206,7 @@ public class MetastoreEvents {
     private DropTableEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(eventType_));
+      Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(getEventType()));
       JSONDropTableMessage dropTableMessage =
           (JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
               .getDropTableMessage(event.getMessage());
@@ -1109,14 +1245,14 @@ public class MetastoreEvents {
       Reference<Boolean> tblRemovedLater = new Reference<>();
       boolean removedTable;
       removedTable = catalogOpExecutor_
-          .removeTableIfNotAddedLater(eventId_, msTbl_.getDbName(),
+          .removeTableIfNotAddedLater(getEventId(), msTbl_.getDbName(),
               msTbl_.getTableName(), tblRemovedLater);
       if (removedTable) {
         infoLog("Successfully removed table {}", getFullyQualifiedTblName());
         metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_REMOVED).inc();
       } else {
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-        infoLog("Incremented skipped metric to " + metrics_
+        debugLog("Incremented skipped metric to " + metrics_
             .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
       }
     }
@@ -1137,7 +1273,8 @@ public class MetastoreEvents {
     private CreateDatabaseEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.CREATE_DATABASE.equals(eventType_));
+      Preconditions.checkArgument(
+          MetastoreEventType.CREATE_DATABASE.equals(getEventType()));
       JSONCreateDatabaseMessage createDatabaseMessage =
           (JSONCreateDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer()
               .getCreateDatabaseMessage(event.getMessage());
@@ -1168,16 +1305,16 @@ public class MetastoreEvents {
     @Override
     public void process() {
       boolean dbAdded = catalogOpExecutor_
-          .addDbIfNotRemovedLater(eventId_, createdDatabase_);
+          .addDbIfNotRemovedLater(getEventId(), createdDatabase_);
       if (!dbAdded) {
         debugLog(
             "Database {} was not added since it either exists or was "
                 + "removed since the event was generated", dbName_);
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-        infoLog("Incremented skipped metric to " + metrics_
+        debugLog("Incremented skipped metric to " + metrics_
             .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
       } else {
-        debugLog("Successfully added database {}", dbName_);
+        infoLog("Successfully added database {}", dbName_);
         metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_DATABASES_ADDED).inc();
       }
     }
@@ -1196,7 +1333,8 @@ public class MetastoreEvents {
     private AlterDatabaseEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.ALTER_DATABASE.equals(eventType_));
+      Preconditions.checkArgument(
+          MetastoreEventType.ALTER_DATABASE.equals(getEventType()));
       JSONAlterDatabaseMessage alterDatabaseMessage =
           (JSONAlterDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer()
               .getAlterDatabaseMessage(event.getMessage());
@@ -1252,7 +1390,8 @@ public class MetastoreEvents {
         CatalogOpExecutor catalogOpExecutor, Metrics metrics, NotificationEvent event)
         throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.DROP_DATABASE.equals(eventType_));
+      Preconditions.checkArgument(
+          MetastoreEventType.DROP_DATABASE.equals(getEventType()));
       JSONDropDatabaseMessage dropDatabaseMessage =
           (JSONDropDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer()
               .getDropDatabaseMessage(event.getMessage());
@@ -1294,13 +1433,13 @@ public class MetastoreEvents {
     @Override
     public void process() {
       boolean dbRemoved = catalogOpExecutor_
-          .removeDbIfNotAddedLater(eventId_, droppedDatabase_.getName());
+          .removeDbIfNotAddedLater(getEventId(), droppedDatabase_.getName());
       if (dbRemoved) {
         infoLog("Removed Database {} ", dbName_);
         metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_DATABASES_REMOVED).inc();
       } else {
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-        infoLog("Incremented skipped metric to " + metrics_
+        debugLog("Incremented skipped metric to " + metrics_
             .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
       }
     }
@@ -1346,7 +1485,7 @@ public class MetastoreEvents {
     private AddPartitionEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkState(eventType_.equals(MetastoreEventType.ADD_PARTITION));
+      Preconditions.checkState(getEventType().equals(MetastoreEventType.ADD_PARTITION));
       if (event.getMessage() == null) {
         throw new IllegalStateException(debugString("Event message is null"));
       }
@@ -1405,7 +1544,7 @@ public class MetastoreEvents {
           // we throw MetastoreNotificationNeedsInvalidateException exception. We skip
           // refresh of the partitions if the table is not present in the catalog.
           int numPartsAdded = catalogOpExecutor_
-              .addPartitionsIfNotRemovedLater(eventId_, dbName_, tblName_,
+              .addPartitionsIfNotRemovedLater(getEventId(), dbName_, tblName_,
                   addedPartitions_, "ADD_PARTITION");
           if (numPartsAdded != 0) {
             infoLog("Successfully added {} partitions to table {}",
@@ -1414,7 +1553,7 @@ public class MetastoreEvents {
                 .inc(numPartsAdded);
           } else {
             metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-            infoLog("Incremented skipped metric to " + metrics_
+            debugLog("Incremented skipped metric to " + metrics_
                 .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
           }
         }
@@ -1436,6 +1575,10 @@ public class MetastoreEvents {
     private final org.apache.hadoop.hive.metastore.api.Partition partitionBefore_;
     // the Partition object after alter operation, as parsed from the NotificationEvent
     private final org.apache.hadoop.hive.metastore.api.Partition partitionAfter_;
+    // the version number from the partition parameters of the event.
+    private final long versionNumberFromEvent_;
+    // the service id from the partition parameters of the event.
+    private final String serviceIdFromEvent_;
 
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory instead
@@ -1443,7 +1586,7 @@ public class MetastoreEvents {
     private AlterPartitionEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkState(eventType_.equals(MetastoreEventType.ALTER_PARTITION));
+      Preconditions.checkState(getEventType().equals(MetastoreEventType.ALTER_PARTITION));
       Preconditions.checkNotNull(event.getMessage());
       AlterPartitionMessage alterPartitionMessage =
           MetastoreEventsProcessor.getMessageDeserializer()
@@ -1455,6 +1598,12 @@ public class MetastoreEvents {
         partitionAfter_ =
             Preconditions.checkNotNull(alterPartitionMessage.getPtnObjAfter());
         msTbl_ = alterPartitionMessage.getTableObj();
+        Map<String, String> parameters = partitionAfter_.getParameters();
+        versionNumberFromEvent_ = Long.parseLong(
+            MetastoreEvents.getStringProperty(parameters,
+                MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
+        serviceIdFromEvent_ = MetastoreEvents.getStringProperty(
+            parameters, MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
       } catch (Exception e) {
         throw new MetastoreNotificationException(
             debugString("Unable to parse the alter partition message"), e);
@@ -1462,6 +1611,50 @@ public class MetastoreEvents {
     }
 
     @Override
+    protected MetastoreEventType getBatchEventType() {
+      return MetastoreEventType.ALTER_PARTITIONS;
+    }
+
+    @Override
+    protected Partition getPartitionForBatching() { return partitionAfter_; }
+
+    @Override
+    public boolean canBeBatched(MetastoreEvent event) {
+      if (!(event instanceof AlterPartitionEvent)) return false;
+      AlterPartitionEvent alterPartitionEvent = (AlterPartitionEvent) event;
+      if (event.getEventId() != 1 + getEventId()) return false;
+      // make sure that the event is on the same table
+      if (!getFullyQualifiedTblName().equalsIgnoreCase(
+          alterPartitionEvent.getFullyQualifiedTblName())) {
+        return false;
+      }
+
+      // in case of ALTER_PARTITION we only batch together the events
+      // which have same versionNumber and serviceId from the event. This simplifies
+      // the self-event evaluation for the batch since either the whole batch is
+      // self-events or not.
+      Map<String, String> parametersFromEvent =
+          alterPartitionEvent.partitionAfter_.getParameters();
+      long versionNumberOfEvent = Long.parseLong(
+          MetastoreEvents.getStringProperty(parametersFromEvent,
+              MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
+      String serviceIdOfEvent = MetastoreEvents.getStringProperty(parametersFromEvent,
+          MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
+      return versionNumberFromEvent_ == versionNumberOfEvent
+          && serviceIdFromEvent_.equals(serviceIdOfEvent);
+    }
+
+    @Override
+    public MetastoreEvent addToBatchEvents(MetastoreEvent event) {
+      if (!(event instanceof AlterPartitionEvent)) return null;
+      BatchPartitionEvent<AlterPartitionEvent> batchEvent = new BatchPartitionEvent<>(
+          this);
+      Preconditions.checkState(batchEvent.canBeBatched(event));
+      batchEvent.addToBatchEvents(event);
+      return batchEvent;
+    }
+
+    @Override
     public void process() throws MetastoreNotificationException, CatalogException {
       if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
@@ -1485,7 +1678,7 @@ public class MetastoreEvents {
         List<TPartitionKeyValue> tPartSpec = getTPartitionSpecFromHmsPartition(msTbl_,
             partitionAfter_);
         try {
-          reloadPartition(partitionAfter_, "ALTER_PARTITION");
+          reloadPartitions(Arrays.asList(partitionAfter_), "ALTER_PARTITION event");
         } catch (CatalogException e) {
           throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
                   + "partition on table {} partition {} failed. Event processing cannot "
@@ -1496,7 +1689,8 @@ public class MetastoreEvents {
       }
     }
 
-    private boolean canBeSkipped() {
+    @Override
+    protected boolean canBeSkipped() {
       // Certain alter events just modify some parameters such as
       // "transient_lastDdlTime" in Hive. For eg: the alter table event generated
       // along with insert events. Check if the alter table event is such a trivial
@@ -1518,6 +1712,139 @@ public class MetastoreEvents {
     }
   }
 
+  /**
+   * This event represents a batch of events of type T. The batch of events is
+   * initialized from a single initial event called baseEvent. More events can be added
+   * to the batch using {@code addToBatchEvents} method. Currently we only support
+   * ALTER_PARTITION and INSERT partition events for batching.
+   * @param <T> The type of event which is batched by this event.
+   */
+  public static class BatchPartitionEvent<T extends MetastoreTableEvent> extends
+      MetastoreTableEvent {
+    private final T baseEvent_;
+    private final List<T> batchedEvents_ = new ArrayList<>();
+
+    private BatchPartitionEvent(T baseEvent) {
+      super(baseEvent.catalogOpExecutor_, baseEvent.metrics_, baseEvent.event_);
+      this.msTbl_ = baseEvent.msTbl_;
+      this.baseEvent_ = baseEvent;
+      batchedEvents_.add(baseEvent);
+      // override the eventType_ to represent that this is a batch of events.
+      setEventType(baseEvent.getBatchEventType());
+    }
+
+    @Override
+    public MetastoreEvent addToBatchEvents(MetastoreEvent event) {
+      Preconditions.checkState(canBeBatched(event));
+      batchedEvents_.add((T) event);
+      return this;
+    }
+
+    @Override
+    public int getNumberOfEvents() { return batchedEvents_.size(); }
+
+    /**
+     * Return the event id of this batch event. We return the last eventId
+     * from this batch which is important since it is used to determined the event
+     * id for fetching next set of events from metastore.
+     */
+    @Override
+    public long getEventId() {
+      Preconditions.checkState(!batchedEvents_.isEmpty());
+      return batchedEvents_.get(batchedEvents_.size()-1).getEventId();
+    }
+
+    /**
+     *
+     * @param event The event under consideration to be batched into this event. It can
+     *              be added to the batch if it can be batched into the last event of the
+     *              current batch.
+     * @return true if we can add the event to the current batch; else false.
+     */
+    @Override
+    public boolean canBeBatched(MetastoreEvent event) {
+      Preconditions.checkState(!batchedEvents_.isEmpty());
+      return batchedEvents_.get(batchedEvents_.size()-1).canBeBatched(event);
+    }
+
+    @VisibleForTesting
+    List<T> getBatchEvents() { return batchedEvents_; }
+
+    @Override
+    protected void process() throws MetastoreNotificationException, CatalogException {
+      if (isSelfEvent()) {
+        infoLog("Not processing the event as it is a self-event");
+        return;
+      }
+
+      // Ignore the event if this is a trivial event. See javadoc for
+      // isTrivialAlterPartitionEvent() for examples.
+      List<T> eventsToProcess = new ArrayList<>();
+      for (T event : batchedEvents_) {
+        if (!event.canBeSkipped()) {
+          eventsToProcess.add(event);
+        }
+      }
+      if (eventsToProcess.isEmpty()) {
+        LOG.info(
+            "Ignoring events from event id {} to {} since they modify parameters "
+            + " which can be ignored", getFirstEventId(), getLastEventId());
+        return;
+      }
+
+      // Reload the whole table if it's a transactional table.
+      if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
+        reloadTableFromCatalog(getEventType().toString(), true);
+      } else {
+        // Reload the partitions from the batch.
+        List<Partition> partitions = new ArrayList<>();
+        for (T event : eventsToProcess) {
+          partitions.add(event.getPartitionForBatching());
+        }
+        try {
+          reloadPartitions(partitions, getEventType().toString() + " event");
+        } catch (CatalogException e) {
+          throw new MetastoreNotificationNeedsInvalidateException(String.format(
+              "Refresh partitions on table %s failed when processing event ids %s-%s. "
+              + "Issue an invalidate command to reset the event processor state.",
+              getFullyQualifiedTblName(), getFirstEventId(), getLastEventId()), e);
+        }
+      }
+    }
+
+    /**
+     * Gets the event id of the first event in the batch.
+     */
+    private long getFirstEventId() {
+      return batchedEvents_.get(0).getEventId();
+    }
+
+    /**
+     * Gets the event id of the last event in the batch.
+     */
+    private long getLastEventId() {
+      return batchedEvents_.get(batchedEvents_.size()-1).getEventId();
+    }
+
+    @Override
+    protected SelfEventContext getSelfEventContext() {
+      List<List<TPartitionKeyValue>> partitionKeyValues = new ArrayList<>();
+      List<Long> eventIds = new ArrayList<>();
+      // We treat insert event as a special case since the self-event context for an
+      // insert event is generated differently using the eventIds.
+      boolean isInsertEvent = baseEvent_ instanceof InsertEvent;
+      for (T event : batchedEvents_) {
+        partitionKeyValues.add(
+            getTPartitionSpecFromHmsPartition(event.msTbl_,
+                event.getPartitionForBatching()));
+        eventIds.add(event.getEventId());
+      }
+      return new SelfEventContext(dbName_, tblName_, partitionKeyValues,
+          baseEvent_.getPartitionForBatching().getParameters(),
+          isInsertEvent ? eventIds : null);
+    }
+  }
+
   public static class DropPartitionEvent extends MetastoreTableEvent {
     private final List<Map<String, String>> droppedPartitions_;
     public static final String EVENT_TYPE = "DROP_PARTITION";
@@ -1528,7 +1855,7 @@ public class MetastoreEvents {
     private DropPartitionEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkState(eventType_.equals(MetastoreEventType.DROP_PARTITION));
+      Preconditions.checkState(getEventType().equals(MetastoreEventType.DROP_PARTITION));
       Preconditions.checkNotNull(event.getMessage());
       DropPartitionMessage dropPartitionMessage =
           MetastoreEventsProcessor.getMessageDeserializer()
@@ -1566,7 +1893,7 @@ public class MetastoreEvents {
           reloadTableFromCatalog("DROP_PARTITION", true);
         } else {
           int numPartsRemoved = catalogOpExecutor_
-              .removePartitionsIfNotAddedLater(eventId_, dbName_, tblName_,
+              .removePartitionsIfNotAddedLater(getEventId(), dbName_, tblName_,
                   droppedPartitions_, "DROP_PARTITION");
           if (numPartsRemoved > 0) {
             infoLog("{} partitions dropped from table {}", numPartsRemoved,
@@ -1575,7 +1902,7 @@ public class MetastoreEvents {
                 .inc(numPartsRemoved);
           } else {
             metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-            infoLog("Incremented skipped metric to " + metrics_
+            debugLog("Incremented skipped metric to " + metrics_
                 .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
           }
         }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 1823898..c602053 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -244,6 +244,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
   public static final String NUMBER_OF_PARTITIONS_REMOVED = "partitions-removed";
   // number of entries in the delete event log
   public static final String DELETE_EVENT_LOG_SIZE = "delete-event-log-size";
+  // number of batch events generated
+  public static final String NUMBER_OF_BATCH_EVENTS = "batch-events-created";
 
   /**
    * Wrapper around {@link MetastoreEventsProcessor#getNextMetastoreEvents} which passes
@@ -415,6 +417,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
     metrics_.addCounter(NUMBER_OF_PARTITIONS_REMOVED);
     metrics_
         .addGauge(DELETE_EVENT_LOG_SIZE, (Gauge<Integer>) deleteEventLog_::size);
+    metrics_.addCounter(NUMBER_OF_BATCH_EVENTS);
   }
 
   /**
@@ -746,7 +749,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
           lastProcessedEvent = event.metastoreNotificationEvent_;
           event.processIfEnabled();
           deleteEventLog_.garbageCollect(event.getEventId());
-          lastSyncedEventId_.set(event.eventId_);
+          lastSyncedEventId_.set(event.getEventId());
         }
       }
     } catch (CatalogException e) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
index 10dd4eb..56eedfe 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
@@ -32,21 +32,27 @@ import org.apache.impala.thrift.TPartitionKeyValue;
 public class SelfEventContext {
   private final String dbName_;
   private final String tblName_;
-  private final long insertEventId_;
+  // eventids for the insert events. In case of an un-partitioned table insert
+  // this list contains only one event id for the insert. In case of partition level
+  // inserts, this contains events 1 to 1 with the partition key values
+  // partitionKeyValues_. For example insertEventIds_.get(0) is the insert event for
+  // partition with values partitionKeyValues_.get(0) and so on
+  private final List<Long> insertEventIds_;
+  // the partition key values for self-event evaluation at the partition level.
+  private final List<List<TPartitionKeyValue>> partitionKeyValues_;
   // version number from the event object parameters used for self-event detection
   private final long versionNumberFromEvent_;
   // service id from the event object parameters used for self-event detection
   private final String serviceidFromEvent_;
-  private final List<List<TPartitionKeyValue>> partitionKeyValues_;
 
   SelfEventContext(String dbName, String tblName,
       Map<String, String> parameters) {
-    this(dbName, tblName, null, parameters, -1);
+    this(dbName, tblName, null, parameters, null);
   }
 
   SelfEventContext(String dbName, String tblName,
       List<List<TPartitionKeyValue>> partitionKeyValues, Map<String, String> parameters) {
-    this(dbName, tblName, partitionKeyValues, parameters, -1);
+    this(dbName, tblName, partitionKeyValues, parameters, null);
   }
 
   /**
@@ -58,19 +64,27 @@ public class SelfEventContext {
    * @param partitionKeyValues Partition key-values in case of self-event
    * context is for partition.
    * @param parameters this could be database, table or partition parameters.
+   * @param insertEventIds In case this is self-event context for an insert event, this
+   *                       parameter provides the list of insert event ids which map to
+   *                       the partitions. In case of unpartitioned table must contain
+   *                       only one event id.
    */
   SelfEventContext(String dbName, @Nullable String tblName,
       @Nullable List<List<TPartitionKeyValue>> partitionKeyValues,
-      Map<String, String> parameters, long eventId) {
+      Map<String, String> parameters, List<Long> insertEventIds) {
     Preconditions.checkNotNull(parameters);
+    // if this is for an insert event on partitions then the size of insertEventIds
+    // must be equal to number of TPartitionKeyValues
+    Preconditions.checkArgument((partitionKeyValues == null ||
+        insertEventIds == null) || partitionKeyValues.size() == insertEventIds.size());
     this.dbName_ = Preconditions.checkNotNull(dbName);
     this.tblName_ = tblName;
     this.partitionKeyValues_ = partitionKeyValues;
-    insertEventId_ = eventId;
-    versionNumberFromEvent_ = Long.parseLong(
+    this.insertEventIds_ = insertEventIds;
+    this.versionNumberFromEvent_ = Long.parseLong(
         MetastoreEvents.getStringProperty(parameters,
             MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
-    serviceidFromEvent_ = MetastoreEvents.getStringProperty(
+    this.serviceidFromEvent_ = MetastoreEvents.getStringProperty(
         parameters, MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
   }
 
@@ -82,7 +96,9 @@ public class SelfEventContext {
     return tblName_;
   }
 
-  public long getIdFromEvent() { return insertEventId_; }
+  public long getInsertEventId(int idx) {
+    return insertEventIds_.get(idx);
+  }
 
   public long getVersionNumberFromEvent() {
     return versionNumberFromEvent_;
@@ -94,4 +110,8 @@ public class SelfEventContext {
     return partitionKeyValues_ == null ?
      null : Collections.unmodifiableList(partitionKeyValues_);
   }
+
+  public boolean isInsertEventContext() {
+    return insertEventIds_ != null;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 2625b1d..5cc4bfd 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -4074,17 +4074,21 @@ public class CatalogOpExecutor {
   }
 
   /**
-   * Reloads a partition if exists and has not been removed since the event was generated.
+   * Reloads the given partitions if the exists and have not been removed since the event
+   * was generated.
+   *
    * @param eventId EventId being processed.
    * @param dbName Database name for the partition
    * @param tblName Table name for the partition
-   * @param partition {@link Partition} object from the event.
+   * @param partsFromEvent List of {@link Partition} objects from the events to be
+   *                       reloaded.
    * @param reason Reason for reloading the partitions for logging purposes.
-   * @return True if the partition was reloaded; else false.
-   * @throws CatalogException
+   * @return the number of partitions which were reloaded. If the table does not exist,
+   * returns 0. Some partitions could be skipped if they don't exist anymore.
    */
-  public boolean reloadPartitionIfExists(long eventId, String dbName,
-      String tblName, Partition partition, String reason) throws CatalogException {
+  public int reloadPartitionsIfExist(long eventId, String dbName,
+      String tblName, List<Partition> partsFromEvent, String reason)
+      throws CatalogException {
     Table table = catalog_.getTable(dbName, tblName);
     if (table == null) {
       DeleteEventLog deleteEventLog = catalog_.getMetastoreEventProcessor()
@@ -4094,7 +4098,7 @@ public class CatalogOpExecutor {
         LOG.info(
             "Not reloading the partition of table {} since it was removed "
                 + "later in catalog", new TableName(dbName, tblName));
-        return false;
+        return 0;
       } else {
         throw new TableNotFoundException(
             "Table " + dbName + "." + tblName + " not found");
@@ -4103,7 +4107,7 @@ public class CatalogOpExecutor {
     if (table instanceof IncompleteTable) {
       LOG.info("Table {} is not loaded. Skipping drop partition event {}",
           table.getFullName(), eventId);
-      return false;
+      return 0;
     }
     if (!(table instanceof HdfsTable)) {
       throw new CatalogException("Partition event received on a non-hdfs table");
@@ -4113,33 +4117,32 @@ public class CatalogOpExecutor {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
       HdfsTable hdfsTable = (HdfsTable) table;
-      List<LiteralExpr> partExprs = hdfsTable
-          .getTypeCompatiblePartValues(partition.getValues());
-      HdfsPartition hdfsPartition = hdfsTable.getPartition(partExprs);
-      if (hdfsPartition == null) {
-        LOG.info("Not reloading the partition {} since it does not exist anymore",
-            FileUtils.makePartName(hdfsTable.getClusteringColNames(),
-                partition.getValues()));
-        return false;
-      }
-      Reference<Boolean> wasPartitionRefreshed = new Reference<>();
-      catalog_.reloadHdfsPartition(hdfsTable, hdfsPartition.getPartitionName(),
-          wasPartitionRefreshed, ThriftObjectType.NONE, reason, newCatalogVersion,
-          hdfsPartition);
-      if (wasPartitionRefreshed.getRef()) {
-        LOG.info("EventId: {} Table {} partition {} has been refreshed", eventId,
-            hdfsTable.getFullName(),
-            FileUtils.makePartName(hdfsTable.getClusteringColNames(),
-                partition.getValues()));
-      }
-      return wasPartitionRefreshed.getRef();
-    } catch (InternalException | UnsupportedEncodingException e) {
+      // some partitions from the event or the table itself
+      // may not exist in HMS anymore. Hence, we collect the names here and re-fetch
+      // the partitions from HMS.
+      List<String> partNames = new ArrayList<>();
+      for (Partition part : partsFromEvent) {
+        partNames.add(FileUtils.makePartName(hdfsTable.getClusteringColNames(),
+            part.getValues(), null));
+      }
+      int numOfPartsReloaded;
+      try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+        numOfPartsReloaded = hdfsTable.reloadPartitionsFromNames(
+            metaStoreClient.getHiveClient(), partNames, reason);
+      }
+      hdfsTable.setCatalogVersion(newCatalogVersion);
+      return numOfPartsReloaded;
+    } catch (TableLoadingException e) {
+      LOG.info("Could not reload {} partitions of table {}", partsFromEvent.size(),
+          table.getFullName(), e);
+    } catch (InternalException e) {
       throw new CatalogException(
-          "Unable to add partition for table " + table.getFullName(), e);
+          "Could not acquire lock on the table " + table.getFullName(), e);
     } finally {
       UnlockWriteLockIfErronouslyLocked();
       table.releaseWriteLock();
     }
+    return 0;
   }
 
   public ReentrantLock getMetastoreDdlLock() {
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 5dfdc1b..51ba024 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
@@ -33,6 +34,7 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,7 +48,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.metastore.api.Catalog;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionsRequest;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
@@ -84,7 +86,12 @@ import org.apache.impala.catalog.ScalarFunction;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
+import org.apache.impala.catalog.events.MetastoreEvents.AlterPartitionEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.BatchPartitionEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.AlterTableEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.InsertEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
@@ -151,14 +158,11 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
 
 import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Main test class to cover the functionality of MetastoreEventProcessor. In order to make
@@ -275,10 +279,10 @@ public class MetastoreEventsProcessorTest {
         Mockito.spy(eventsProcessor_);
     for (MetastoreEventProcessorConfig config: MetastoreEventProcessorConfig.values()) {
       String configKey = config.getValidator().getConfigKey();
-      Mockito.when(mockMetastoreEventsProcessor.getConfigValueFromMetastore(configKey,
+      when(mockMetastoreEventsProcessor.getConfigValueFromMetastore(configKey,
           "")).thenReturn("false");
       if (config.equals(MetastoreEventProcessorConfig.METASTORE_DEFAULT_CATALOG_NAME)) {
-        Mockito.when(mockMetastoreEventsProcessor.getConfigValueFromMetastore(configKey,
+        when(mockMetastoreEventsProcessor.getConfigValueFromMetastore(configKey,
             "")).thenReturn("test_custom_catalog");
       }
     }
@@ -1063,6 +1067,9 @@ public class MetastoreEventsProcessorTest {
       InsertEventRequestData insertEventRequestData = new InsertEventRequestData();
       insertEventRequestData.setFilesAdded(newFiles);
       insertEventRequestData.setReplace(isOverwrite);
+      if (partition != null) {
+        insertEventRequestData.setPartitionVal(partition.getValues());
+      }
       partitionInsertEventInfos
           .add(insertEventRequestData);
       MetastoreShim.fireInsertEventHelper(metaStoreClient.getHiveClient(),
@@ -2223,6 +2230,220 @@ public class MetastoreEventsProcessorTest {
         numberOfSelfEventsBefore + 9, selfEventsCountAfter);
   }
 
+  @Test
+  public void testEventBatching() throws Exception {
+    // create test table and generate a real ALTER_PARTITION and INSERT event on it
+    String testTblName = "testEventBatching";
+    createDatabaseFromImpala(TEST_DB_NAME, "test");
+    createTable(TEST_DB_NAME, testTblName, true);
+    List<List<String>> partVals = new ArrayList<>();
+    partVals.add(Collections.singletonList("1"));
+    partVals.add(Collections.singletonList("2"));
+    addPartitions(TEST_DB_NAME, testTblName, partVals);
+    eventsProcessor_.processEvents();
+    alterPartitionsParams(TEST_DB_NAME, testTblName, "testkey", "val", partVals);
+    // we fetch a real alter partition event so that we can generate mocks using its
+    // contents below
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    NotificationEvent alterPartEvent = null;
+    String alterPartitionEventType = "ALTER_PARTITION";
+    for (NotificationEvent event : events) {
+      if (event.getEventType().equalsIgnoreCase(alterPartitionEventType)) {
+        alterPartEvent = event;
+        break;
+      }
+    }
+    assertNotNull(alterPartEvent);
+    String alterPartMessage = alterPartEvent.getMessage();
+
+    // test insert event batching
+    org.apache.hadoop.hive.metastore.api.Table msTbl;
+    List<Partition> partitions;
+    PartitionsRequest req = new PartitionsRequest();
+    req.setDbName(TEST_DB_NAME);
+    req.setTblName(testTblName);
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      msTbl = metaStoreClient.getHiveClient().getTable(TEST_DB_NAME, testTblName);
+      partitions = metaStoreClient.getHiveClient().getPartitionsRequest(req)
+          .getPartitions();
+    }
+    assertNotNull(msTbl);
+    assertNotNull(partitions);
+    eventsProcessor_.processEvents();
+    // generate 2 insert event on each of the partition
+    for (Partition part : partitions) {
+      simulateInsertIntoTableFromFS(msTbl, 1, part, false);
+    }
+    List<NotificationEvent> insertEvents = eventsProcessor_.getNextMetastoreEvents();
+    NotificationEvent insertEvent = null;
+    for (NotificationEvent event : insertEvents) {
+      if (event.getEventType().equalsIgnoreCase("INSERT")) {
+        insertEvent = event;
+        break;
+      }
+    }
+    assertNotNull(insertEvent);
+    Map<String, String> eventTypeToMessage = new HashMap<>();
+    eventTypeToMessage.put("ALTER_PARTITION", alterPartMessage);
+    eventTypeToMessage.put("INSERT", insertEvent.getMessage());
+    runEventBatchingTest(testTblName, eventTypeToMessage);
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private void runEventBatchingTest(String testTblName,
+      Map<String, String> eventTypeToMessage) throws MetastoreNotificationException {
+    for (String eventType : eventTypeToMessage.keySet()) {
+      String eventMessage = eventTypeToMessage.get(eventType);
+      // we have 10 mock batchable events which should be batched into 1
+      List<MetastoreEvent> mockEvents = createMockEvents(100, 10,
+          eventType, TEST_DB_NAME, testTblName, eventMessage);
+      MetastoreEventFactory eventFactory = eventsProcessor_.getEventsFactory();
+      List<MetastoreEvent> batch = eventFactory.createBatchEvents(mockEvents);
+      assertEquals(1, batch.size());
+      assertTrue(batch.get(0) instanceof BatchPartitionEvent);
+      BatchPartitionEvent batchEvent = (BatchPartitionEvent) batch.get(0);
+      assertEquals(10, batchEvent.getBatchEvents().size());
+      // create a batch which consists of some other events
+      // only contiguous events should be batched
+      // 13-15 mock events which can be batched
+      mockEvents = createMockEvents(13, 3,
+          eventType, TEST_DB_NAME, testTblName, eventMessage);
+      // 17-18 can be batched
+      mockEvents.addAll(createMockEvents(17, 2,
+          eventType, TEST_DB_NAME, testTblName, eventMessage));
+      // event id 20 should not be batched
+      mockEvents.addAll(createMockEvents(20, 1,
+          eventType, TEST_DB_NAME, testTblName, eventMessage));
+      // events 22-24 should be batched
+      mockEvents.addAll(createMockEvents(22, 3,
+          eventType, TEST_DB_NAME, testTblName, eventMessage));
+
+      batch = eventFactory.createBatchEvents(mockEvents);
+      assertEquals(4, batch.size());
+      MetastoreEvent batch1 = batch.get(0);
+      assertEquals(3, ((BatchPartitionEvent) batch1).getBatchEvents().size());
+      MetastoreEvent batch2 = batch.get(1);
+      assertEquals(2, ((BatchPartitionEvent) batch2).getBatchEvents().size());
+      MetastoreEvent batch3 = batch.get(2);
+      if (eventType.equalsIgnoreCase("ALTER_PARTITION")) {
+        assertTrue(batch3 instanceof AlterPartitionEvent);
+      } else {
+        assertTrue(batch3 instanceof InsertEvent);
+      }
+      MetastoreEvent batch4 = batch.get(3);
+      assertEquals(3, ((BatchPartitionEvent) batch4).getBatchEvents().size());
+      // test to make sure that events which have different database name are not
+      // batched
+      mockEvents = createMockEvents(100, 1, eventType, TEST_DB_NAME,
+          testTblName, eventMessage);
+      mockEvents.addAll(createMockEvents(101, 1, eventType, "db1",
+          testTblName, eventMessage));
+
+      List<MetastoreEvent> batchEvents = eventFactory.createBatchEvents(mockEvents);
+      assertEquals(2, batchEvents.size());
+      for (MetastoreEvent event : batchEvents) {
+        if (eventType.equalsIgnoreCase("ALTER_PARTITION")) {
+          assertTrue(event instanceof AlterPartitionEvent);
+        } else {
+          assertTrue(event instanceof InsertEvent);
+        }
+      }
+
+      // test no batching when table name is different
+      mockEvents = createMockEvents(100, 1, eventType, TEST_DB_NAME,
+          testTblName, eventMessage);
+      mockEvents.addAll(createMockEvents(101, 1, eventType, TEST_DB_NAME,
+          "testtbl", eventMessage));
+      batchEvents = eventFactory.createBatchEvents(mockEvents);
+      assertEquals(2, batchEvents.size());
+      for (MetastoreEvent event : batchEvents) {
+        if (eventType.equalsIgnoreCase("ALTER_PARTITION")) {
+          assertTrue(event instanceof AlterPartitionEvent);
+        } else {
+          assertTrue(event instanceof InsertEvent);
+        }
+      }
+    }
+    // make sure 2 events of different event types are not batched together
+    long startEventId = 17;
+    // batch 1
+    List<MetastoreEvent> mockEvents = createMockEvents(startEventId, 3,
+        "ALTER_PARTITION", TEST_DB_NAME, testTblName,
+        eventTypeToMessage.get("ALTER_PARTITION"));
+    // batch 2
+    mockEvents.addAll(createMockEvents(startEventId + mockEvents.size(), 3, "INSERT",
+        TEST_DB_NAME, testTblName, eventTypeToMessage.get("INSERT")));
+    // batch 3 : single event not-batched
+    mockEvents.addAll(
+        createMockEvents(startEventId + mockEvents.size(), 1, "ALTER_PARTITION",
+        TEST_DB_NAME, testTblName, eventTypeToMessage.get("ALTER_PARTITION")));
+    // batch 4 : single event not-batched
+    mockEvents.addAll(createMockEvents(startEventId + mockEvents.size(), 1, "INSERT",
+        TEST_DB_NAME, testTblName, eventTypeToMessage.get("INSERT")));
+    // batch 5 : single event not-batched
+    mockEvents.addAll(
+        createMockEvents(startEventId + mockEvents.size(), 1, "ALTER_PARTITION",
+            TEST_DB_NAME, testTblName, eventTypeToMessage.get("ALTER_PARTITION")));
+    // batch 6
+    mockEvents.addAll(createMockEvents(startEventId + mockEvents.size(), 5, "INSERT",
+        TEST_DB_NAME, testTblName, eventTypeToMessage.get("INSERT")));
+    // batch 7
+    mockEvents.addAll(
+        createMockEvents(startEventId + mockEvents.size(), 5, "ALTER_PARTITION",
+            TEST_DB_NAME, testTblName, eventTypeToMessage.get("ALTER_PARTITION")));
+    List<MetastoreEvent> batchedEvents = eventsProcessor_.getEventsFactory()
+        .createBatchEvents(mockEvents);
+    assertEquals(7, batchedEvents.size());
+    // batch 1 should contain 3 AlterPartitionEvent
+    BatchPartitionEvent<AlterPartitionEvent> batch1 = (BatchPartitionEvent
+        <AlterPartitionEvent>) batchedEvents.get(0);
+    assertEquals(3, batch1.getNumberOfEvents());
+    // batch 2 should contain 3 InsertEvent
+    BatchPartitionEvent<InsertEvent> batch2 = (BatchPartitionEvent
+        <InsertEvent>) batchedEvents.get(1);
+    assertEquals(3, batch2.getNumberOfEvents());
+    // 3rd is a single non-batch event
+    assertTrue(batchedEvents.get(2) instanceof AlterPartitionEvent);
+    // 4th is a single non-batch insert event
+    assertTrue(batchedEvents.get(3) instanceof InsertEvent);
+    // 5th is a single non-batch alter partition event
+    assertTrue(batchedEvents.get(4) instanceof AlterPartitionEvent);
+    // 6th is batch insert event of size 5
+    BatchPartitionEvent<InsertEvent> batch6 = (BatchPartitionEvent
+        <InsertEvent>) batchedEvents.get(5);
+    assertEquals(5, batch6.getNumberOfEvents());
+    // 7th is batch of alter partitions of size 5
+    BatchPartitionEvent<AlterPartitionEvent> batch7 = (BatchPartitionEvent
+        <AlterPartitionEvent>) batchedEvents.get(6);
+    assertEquals(5, batch7.getNumberOfEvents());
+  }
+
+
+  /**
+   * Creates mock notification event from the given list of parameters. The message
+   * should be a legal parsable message from a real notification event.
+   */
+  private List<MetastoreEvent> createMockEvents(long startEventId, int numEvents,
+      String eventType, String dbName, String tblName, String message)
+      throws MetastoreNotificationException {
+    List<NotificationEvent> mockEvents = new ArrayList<>();
+    for (int i=0; i<numEvents; i++) {
+      NotificationEvent mock = Mockito.mock(NotificationEvent.class);
+      when(mock.getEventId()).thenReturn(startEventId);
+      when(mock.getEventType()).thenReturn(eventType);
+      when(mock.getDbName()).thenReturn(dbName);
+      when(mock.getTableName()).thenReturn(tblName);
+      when(mock.getMessage()).thenReturn(message);
+      mockEvents.add(mock);
+      startEventId++;
+    }
+    List<MetastoreEvent> metastoreEvents = new ArrayList<>();
+    for (NotificationEvent notificationEvent : mockEvents) {
+      metastoreEvents.add(eventsProcessor_.getEventsFactory().get(notificationEvent));
+    }
+    return metastoreEvents;
+  }
+
   private abstract class AlterTableExecutor {
     protected abstract void execute() throws Exception;
 
@@ -2391,7 +2612,7 @@ public class MetastoreEventsProcessorTest {
             + "self-event", hdfsPartition,
         catalog_.getHdfsPartition(TEST_DB_NAME, testTblName, partKeyVals));
 
-    // compute stats on the table and make sure that the table and its partittions are
+    // compute stats on the table and make sure that the table and its partitions are
     // not refreshed due to the events
     alterTableComputeStats(testTblName, Arrays.asList(Arrays.asList("1"),
         Arrays.asList("2")));
@@ -3148,6 +3369,20 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  private void alterPartitionsParams(String db, String tblName, String key,
+      String val, List<List<String>> partVals) throws Exception {
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      List<Partition> partitions = new ArrayList<>();
+      for (List<String> partVal : partVals) {
+        Partition partition = metaStoreClient.getHiveClient().getPartition(TEST_DB_NAME,
+            tblName, partVal);
+        partition.getParameters().put(key, val);
+        partitions.add(partition);
+      }
+      metaStoreClient.getHiveClient().alter_partitions(TEST_DB_NAME, tblName, partitions);
+    }
+  }
+
   /**
    * Alters trivial partition properties which must be ignored by the event processor
    */
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index e4c2089..c90275e 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -197,12 +197,9 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
 
     # get the metric before values
     EventProcessorUtils.wait_for_event_processing(self)
-    create_metric_val_before = EventProcessorUtils. \
-      get_event_processor_metric(create_metric_name, 0)
-    removed_metric_val_before = EventProcessorUtils. \
-      get_event_processor_metric(removed_metric_name, 0)
-    events_skipped_before = EventProcessorUtils. \
-      get_event_processor_metric('events-skipped', 0)
+    create_metric_val_before = EventProcessorUtils.get_int_metric(create_metric_name, 0)
+    removed_metric_val_before = EventProcessorUtils.get_int_metric(removed_metric_name, 0)
+    events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0)
     num_iters = 100
     for iter in xrange(num_iters):
       for q in queries:
@@ -212,22 +209,19 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
           print("Failed in {} iterations. Error {}".format(iter, str(e)))
           raise
     EventProcessorUtils.wait_for_event_processing(self)
-    create_metric_val_after = EventProcessorUtils. \
-      get_event_processor_metric(create_metric_name, 0)
-    removed_metric_val_after = EventProcessorUtils. \
-      get_event_processor_metric(removed_metric_name, 0)
-    events_skipped_after = EventProcessorUtils. \
-      get_event_processor_metric('events-skipped', 0)
-    num_delete_event_entries = EventProcessorUtils. \
-      get_event_processor_metric('delete-event-log-size', 0)
+    create_metric_val_after = EventProcessorUtils.get_int_metric(create_metric_name, 0)
+    removed_metric_val_after = EventProcessorUtils.get_int_metric(removed_metric_name, 0)
+    events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0)
+    num_delete_event_entries = EventProcessorUtils.\
+        get_int_metric('delete-event-log-size', 0)
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
     # None of the queries above should actually trigger a add/remove object from events
-    assert int(create_metric_val_after) == int(create_metric_val_before)
-    assert int(removed_metric_val_after) == int(removed_metric_val_before)
+    assert create_metric_val_after == create_metric_val_before
+    assert removed_metric_val_after == removed_metric_val_before
     # each query set generates 2 events and both of them should be skipped
-    assert int(events_skipped_after) == num_iters * 2 + int(events_skipped_before)
+    assert events_skipped_after == num_iters * 2 + events_skipped_before
     # make sure that there are no more entries in the delete event log
-    assert int(num_delete_event_entries) == 0
+    assert num_delete_event_entries == 0
 
   @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
   def test_self_events(self, unique_database):
@@ -238,6 +232,99 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     self.__run_self_events_test(unique_database, True)
     self.__run_self_events_test(unique_database, False)
 
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
+  def test_event_batching(self, unique_database):
+    """Runs queries which generate multiple ALTER_PARTITION events which must be
+    batched by events processor. Runs as a custom cluster test to isolate the metric
+    values from other tests."""
+    testtbl = "test_event_batching"
+    test_acid_tbl = "test_event_batching_acid"
+    acid_props = self.__get_transactional_tblproperties(True)
+    # create test tables
+    self.client.execute(
+      "create table {}.{} like functional.alltypes".format(unique_database, testtbl))
+    self.client.execute(
+      "insert into {}.{} partition (year,month) select * from functional.alltypes".format(
+        unique_database, testtbl))
+    self.client.execute(
+      "create table {}.{} (id int) partitioned by (year int, month int) {}".format(
+        unique_database, test_acid_tbl, acid_props))
+    self.client.execute(
+      "insert into {}.{} partition (year, month) "
+      "select id, year, month from functional.alltypes".format(unique_database,
+        test_acid_tbl))
+    # run compute stats from impala; this should generate 24 ALTER_PARTITION events which
+    # should be batched together into 1 or more number of events.
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_metric = "batch-events-created"
+    batch_events_1 = EventProcessorUtils.get_int_metric(batch_events_metric)
+    self.client.execute("compute stats {}.{}".format(unique_database, testtbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_2 = EventProcessorUtils.get_int_metric(batch_events_metric)
+    assert batch_events_2 > batch_events_1
+    # run analyze stats event from hive which generates ALTER_PARTITION event on each
+    # partition of the table
+    self.run_stmt_in_hive(
+      "analyze table {}.{} compute statistics".format(unique_database, testtbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_3 = EventProcessorUtils.get_int_metric(batch_events_metric)
+    assert batch_events_3 > batch_events_2
+    # in case of transactional table since we batch the events together, the number of
+    # tables refreshed must be far lower than number of events generated
+    num_table_refreshes_1 = EventProcessorUtils.get_int_metric(
+      "tables-refreshed")
+    self.client.execute("compute stats {}.{}".format(unique_database, test_acid_tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_4 = EventProcessorUtils.get_int_metric(batch_events_metric)
+    num_table_refreshes_2 = EventProcessorUtils.get_int_metric(
+      "tables-refreshed")
+    # we should generate atleast 1 batch event if not more due to the 24 consecutive
+    # ALTER_PARTITION events
+    assert batch_events_4 > batch_events_3
+    # table should not be refreshed since this is a self-event
+    assert num_table_refreshes_2 == num_table_refreshes_1
+    self.run_stmt_in_hive(
+      "analyze table {}.{} compute statistics".format(unique_database, test_acid_tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_5 = EventProcessorUtils.get_int_metric(batch_events_metric)
+    assert batch_events_5 > batch_events_4
+    num_table_refreshes_2 = EventProcessorUtils.get_int_metric("tables-refreshed")
+    # the analyze table from hive generates 24 ALTER_PARTITION events which should be
+    # batched into 1-2 batches (depending on timing of the event poll thread).
+    assert num_table_refreshes_2 > num_table_refreshes_1
+    assert int(num_table_refreshes_2) - int(num_table_refreshes_1) < 24
+    EventProcessorUtils.wait_for_event_processing(self)
+    # test for batching of insert events
+    batch_events_insert = EventProcessorUtils.get_int_metric(batch_events_metric)
+    tables_refreshed_insert = EventProcessorUtils.get_int_metric("tables-refreshed")
+    partitions_refreshed_insert = EventProcessorUtils.get_int_metric(
+      "partitions-refreshed")
+    self.client.execute(
+      "insert into {}.{} partition (year,month) select * from functional.alltypes".format(
+        unique_database, testtbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_after_insert = EventProcessorUtils.get_int_metric(batch_events_metric)
+    tables_refreshed_after_insert = EventProcessorUtils.get_int_metric("tables-refreshed")
+    partitions_refreshed_after_insert = EventProcessorUtils.get_int_metric(
+      "partitions-refreshed")
+    # this is a self-event tables or partitions should not be refreshed
+    assert batch_events_after_insert > batch_events_insert
+    assert tables_refreshed_after_insert == tables_refreshed_insert
+    assert partitions_refreshed_after_insert == partitions_refreshed_insert
+    # run the insert from hive to make sure that batch event is refreshing all the
+    # partitions
+    self.run_stmt_in_hive(
+      "SET hive.exec.dynamic.partition.mode=nonstrict; insert into {}.{} partition"
+      " (year,month) select * from functional.alltypes".format(
+        unique_database, testtbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_after_hive = EventProcessorUtils.get_int_metric(batch_events_metric)
+    partitions_refreshed_after_hive = EventProcessorUtils.get_int_metric(
+      "partitions-refreshed")
+    assert batch_events_after_hive > batch_events_insert
+    # 24 partitions inserted and hence we must refresh 24 partitions once.
+    assert int(partitions_refreshed_after_hive) == int(partitions_refreshed_insert) + 24
+
   def __run_self_events_test(self, db_name, use_impala):
     recover_tbl_name = ImpalaTestSuite.get_random_name("tbl_")
     # create a table similar to alltypes so that we can recover the partitions on it
@@ -412,8 +499,8 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     self.client.execute("refresh {0}.{1}".format(db_name, tbl_name))
     self_event_test_queries = [
       # ALTER_DATABASE cases
-      "alter database {0} set dbproperties ('comment'='self-event test database')".format(
-        db_name),
+      "alter database {0} set dbproperties ('comment'='self-event test "
+      "database')".format(db_name),
       "alter database {0} set owner user `test-user`".format(db_name),
       # ALTER_TABLE case
       "alter table {0}.{1} set tblproperties ('k'='v')".format(db_name, tbl_name),
@@ -455,12 +542,10 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     Gets the tables-refreshed, partitions-refreshed and events-skipped metric values
     from Metastore EventsProcessor
     """
-    tbls_refreshed_count = EventProcessorUtils.get_event_processor_metric(
-      'tables-refreshed', 0)
-    partitions_refreshed_count = EventProcessorUtils.get_event_processor_metric(
+    tbls_refreshed_count = EventProcessorUtils.get_int_metric('tables-refreshed', 0)
+    partitions_refreshed_count = EventProcessorUtils.get_int_metric(
       'partitions-refreshed', 0)
-    events_skipped_count = EventProcessorUtils.get_event_processor_metric(
-      'events-skipped', 0)
+    events_skipped_count = EventProcessorUtils.get_int_metric('events-skipped', 0)
     return int(tbls_refreshed_count), int(partitions_refreshed_count), \
       int(events_skipped_count)
 
diff --git a/tests/metadata/test_event_processing.py b/tests/metadata/test_event_processing.py
index 7dd2cab..f3a9380 100644
--- a/tests/metadata/test_event_processing.py
+++ b/tests/metadata/test_event_processing.py
@@ -181,6 +181,7 @@ class TestEventProcessing(ImpalaTestSuite):
   def test_empty_partition_events(self, unique_database):
     self._run_test_empty_partition_events(unique_database, False)
 
+  @pytest.mark.xfail(run=False, reason="IMPALA-9057")
   def test_event_based_replication(self):
     self.__run_event_based_replication_tests()
 
diff --git a/tests/util/event_processor_utils.py b/tests/util/event_processor_utils.py
index 4a40508..81a44c4 100644
--- a/tests/util/event_processor_utils.py
+++ b/tests/util/event_processor_utils.py
@@ -87,12 +87,13 @@ class EventProcessorUtils(object):
      return dict(pairs)
 
   @staticmethod
-  def get_event_processor_metric(metric_key, default_val=None):
-    """Returns the event processor metric from the /events catalog debug page"""
+  def get_int_metric(metric_key, default_val=None):
+    """Returns the int value of event processor metric from the /events catalogd debug
+     page"""
     metrics = EventProcessorUtils.get_event_processor_metrics()
     if metric_key not in metrics:
-      return default_val
-    return metrics[metric_key]
+      return int(default_val)
+    return int(metrics[metric_key])
 
   @staticmethod
   def get_last_synced_event_id():

[impala] 02/04: IMPALA-10914: Consistently schedule scan ranges for Iceberg tables

Posted by bi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bikram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 61964882d152edd0d369c9a912a52d5c982f3523
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Sep 17 15:44:42 2021 +0200

    IMPALA-10914: Consistently schedule scan ranges for Iceberg tables
    
    Before this patch Impala inconsistently scheduled scan ranges for
    Iceberg tables on HDFS, in local catalog mode. It did so because
    LocalIcebergTable reloaded all the files descriptors, and the HDFS
    block locations were not consistent across the reloads. Impala's
    scheduler uses the block location list for scan range assignment,
    hence the assignments were inconsistent between queries. This has
    a negative effect on caching and hence hit performance quite badly.
    
    It is redundant and expensive to reload file descriptors for each
    query in local catalog mode. This patch extends the GetPartialInfo()
    RPC with Iceberg-specific snapshot information. It means that the
    coordinator is now able to fetch Iceberg data file descriptors from
    the CatalogD. This way scan range assignment becomes consistent
    because we reuse the same file descriptors with the same block
    location information.
    
    Fixing the above revealed another bug. Before this patch we didn't
    handle self-events of Iceberg tables. When an Iceberg table is stored
    in the HiveCatalog it means that Iceberg will update the HMS table
    on modifications because it needs to update table property
    'metadata_location' (this points to the new snapshot file).
    Then Catalogd processes these modifications again when they arrive
    via the event notification mechanism. I fixed this by creating Iceberg
    transactions in which I set the catalog service ID and new catalog
    version for the Iceberg table. Since we are using transactions now
    Iceberg has to embed all table modifications in a single ALTER TABLE
    request to HMS, and detect the corresponding alter event later via the
    aforementioned catalog service ID and version.
    
    Testing:
     * added e2e test for the scan range assignment
     * added e2e test for detecting self-events
    
    Change-Id: Ibb8216b37d350469b573dad7fcefdd0ee0599ed5
    Reviewed-on: http://gerrit.cloudera.org:8080/17857
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Qifan Chen <qc...@cloudera.com>
---
 common/thrift/CatalogService.thrift                |  12 ++
 .../impala/catalog/CatalogServiceCatalog.java      |   5 +-
 .../org/apache/impala/catalog/FeIcebergTable.java  |  38 ++++-
 .../org/apache/impala/catalog/IcebergTable.java    |  29 ++--
 .../main/java/org/apache/impala/catalog/Table.java |   1 +
 .../impala/catalog/iceberg/IcebergCatalogs.java    |  25 +--
 .../impala/catalog/iceberg/IcebergCtasTarget.java  |  16 +-
 .../impala/catalog/iceberg/IcebergHiveCatalog.java |   1 +
 .../impala/catalog/local/CatalogdMetaProvider.java |  11 ++
 .../impala/catalog/local/DirectMetaProvider.java   |   8 +
 .../impala/catalog/local/LocalIcebergTable.java    |  24 +--
 .../apache/impala/catalog/local/MetaProvider.java  |   6 +
 .../apache/impala/service/CatalogOpExecutor.java   | 168 +++++++++++++++------
 .../impala/service/IcebergCatalogOpExecutor.java   | 105 ++++++++-----
 .../java/org/apache/impala/util/IcebergUtil.java   |   8 +-
 .../queries/QueryTest/iceberg-catalogs.test        |   2 -
 .../queries/QueryTest/show-create-table.test       |   2 +-
 tests/custom_cluster/test_events_custom_configs.py |  58 +++++++
 tests/metadata/test_show_create_table.py           |   4 +-
 tests/query_test/test_iceberg.py                   |  23 +++
 tests/stress/test_insert_stress.py                 |  26 +++-
 21 files changed, 432 insertions(+), 140 deletions(-)

diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 058ab8a..1b6ab30 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -389,6 +389,10 @@ struct TTableInfoSelector {
   // Note that want_hms_partition=true will consume more space (IMPALA-7501), so only use
   // it in cases the clients do need HMS partition structs.
   12: bool want_hms_partition
+
+  // The response should contain information about the Iceberg snapshot, i.e. the snapshot
+  // id and the file descriptors.
+  13: bool want_iceberg_snapshot
 }
 
 // Returned information about a particular partition.
@@ -435,6 +439,11 @@ struct TPartialPartitionInfo {
   13: optional CatalogObjects.THdfsPartitionLocation location
 }
 
+struct TIcebergSnapshot {
+  1: required i64 snapshot_id
+  2: optional map<string, CatalogObjects.THdfsFileDesc> iceberg_file_desc_map
+}
+
 // Returned information about a Table, as selected by TTableInfoSelector.
 struct TPartialTableInfo {
   1: optional hive_metastore.Table hms_table
@@ -474,6 +483,9 @@ struct TPartialTableInfo {
   // The prefixes of locations of partitions in this table. See THdfsPartitionLocation for
   // the description of how a prefix is computed.
   11: optional list<string> partition_prefixes
+
+  // Iceberg snapshot information
+  12: optional TIcebergSnapshot iceberg_snapshot
 }
 
 struct TBriefTableMeta {
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 34eea1c..942698c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -3363,9 +3363,8 @@ public class CatalogServiceCatalog extends Catalog {
       TGetPartialCatalogObjectResponse resp;
       table.takeReadLock();
       try {
-        if (table instanceof HdfsTable || table instanceof IcebergTable) {
-          HdfsTable hdfsTable = table instanceof HdfsTable ? (HdfsTable) table :
-              ((IcebergTable) table).getHdfsTable();
+        if (table instanceof HdfsTable) {
+          HdfsTable hdfsTable = (HdfsTable)table;
           missingPartialInfos = Maps.newHashMap();
           resp = hdfsTable.getPartialInfo(req, missingPartialInfos);
           if (missingPartialInfos.isEmpty()) return resp;
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index bb06002..c25664d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -39,16 +39,19 @@ import org.apache.iceberg.TableMetadata;
 import org.apache.impala.analysis.IcebergPartitionField;
 import org.apache.impala.analysis.IcebergPartitionSpec;
 import org.apache.impala.analysis.LiteralExpr;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Reference;
 import org.apache.impala.compat.HdfsShim;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.THdfsCompression;
+import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
+import org.apache.impala.thrift.TIcebergSnapshot;
 import org.apache.impala.thrift.TIcebergTable;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TResultSet;
@@ -382,11 +385,9 @@ public interface FeIcebergTable extends FeFsTable {
       tIcebergTable.setDefault_partition_spec_id(
           icebergTable.getDefaultPartitionSpecId());
 
-      for (Map.Entry<String, HdfsPartition.FileDescriptor> entry :
-          icebergTable.getPathHashToFileDescMap().entrySet()) {
-        tIcebergTable.putToPath_hash_to_file_descriptor(entry.getKey(),
-          entry.getValue().toThrift());
-      }
+      tIcebergTable.setPath_hash_to_file_descriptor(
+          convertPathHashToFileDescMap(icebergTable));
+
       tIcebergTable.setSnapshot_id(icebergTable.snapshotId());
       tIcebergTable.setParquet_compression_codec(
           icebergTable.getIcebergParquetCompressionCodec());
@@ -399,6 +400,33 @@ public interface FeIcebergTable extends FeFsTable {
       return tIcebergTable;
     }
 
+    public static Map<String, THdfsFileDesc> convertPathHashToFileDescMap(
+        FeIcebergTable icebergTable) {
+      Map<String, THdfsFileDesc> ret = new HashMap<>();
+      for (Map.Entry<String, HdfsPartition.FileDescriptor> entry :
+          icebergTable.getPathHashToFileDescMap().entrySet()) {
+        ret.put(entry.getKey(), entry.getValue().toThrift());
+      }
+      return ret;
+    }
+
+    public static Map<String, FileDescriptor> loadFileDescMapFromThrift(
+        Map<String, THdfsFileDesc> tFileDescMap) {
+      Map<String, FileDescriptor> fileDescMap = new HashMap<>();
+      if (tFileDescMap == null) return fileDescMap;
+      for (Map.Entry<String, THdfsFileDesc> entry : tFileDescMap.entrySet()) {
+        fileDescMap.put(entry.getKey(), FileDescriptor.fromThrift(entry.getValue()));
+      }
+      return fileDescMap;
+    }
+
+    public static TIcebergSnapshot createTIcebergSnapshot(FeIcebergTable icebergTable) {
+      TIcebergSnapshot snapshot = new TIcebergSnapshot();
+      snapshot.setSnapshot_id(icebergTable.snapshotId());
+      snapshot.setIceberg_file_desc_map(convertPathHashToFileDescMap(icebergTable));
+      return snapshot;
+    }
+
     /**
      * Get FileDescriptor by data file location
      */
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 851cd5c..91af691 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -36,6 +36,8 @@ import org.apache.impala.analysis.IcebergPartitionTransform;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCompressionCodec;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
 import org.apache.impala.thrift.THdfsCompression;
 import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.THdfsTable;
@@ -44,6 +46,7 @@ import org.apache.impala.thrift.TIcebergFileFormat;
 import org.apache.impala.thrift.TIcebergPartitionField;
 import org.apache.impala.thrift.TIcebergPartitionSpec;
 import org.apache.impala.thrift.TIcebergTable;
+import org.apache.impala.thrift.TPartialPartitionInfo;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
@@ -415,7 +418,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
     icebergParquetDictPageSize_ = ticeberg.getParquet_dict_page_size();
     partitionSpecs_ = loadPartitionBySpecsFromThrift(ticeberg.getPartition_spec());
     defaultPartitionSpecId_ = ticeberg.getDefault_partition_spec_id();
-    pathHashToFileDescMap_ = loadFileDescFromThrift(
+    pathHashToFileDescMap_ = FeIcebergTable.Utils.loadFileDescMapFromThrift(
         ticeberg.getPath_hash_to_file_descriptor());
     snapshotId_ = ticeberg.getSnapshot_id();
     hdfsTable_.loadFromThrift(thriftTable);
@@ -450,16 +453,6 @@ public class IcebergTable extends Table implements FeIcebergTable {
     return ret;
   }
 
-  private Map<String, FileDescriptor> loadFileDescFromThrift(
-      Map<String, THdfsFileDesc> tFileDescMap) {
-    Map<String, FileDescriptor> fileDescMap = new HashMap<>();
-    if (tFileDescMap == null) return fileDescMap;
-    for (Map.Entry<String, THdfsFileDesc> entry : tFileDescMap.entrySet()) {
-      fileDescMap.put(entry.getKey(), FileDescriptor.fromThrift(entry.getValue()));
-    }
-    return fileDescMap;
-  }
-
   @Override
   public TTableDescriptor toThriftDescriptor(int tableId,
       Set<Long> referencedPartitions) {
@@ -479,4 +472,18 @@ public class IcebergTable extends Table implements FeIcebergTable {
     }
     return hdfsTable;
   }
+
+  @Override
+  public TGetPartialCatalogObjectResponse getPartialInfo(
+      TGetPartialCatalogObjectRequest req) throws CatalogException {
+    Preconditions.checkState(isLoaded(), "unloaded table: %s", getFullName());
+    Map<HdfsPartition, TPartialPartitionInfo> missingPartialInfos = new HashMap<>();
+    TGetPartialCatalogObjectResponse resp =
+        getHdfsTable().getPartialInfo(req, missingPartialInfos);
+    if (req.table_info_selector.want_iceberg_snapshot) {
+      resp.table_info.setIceberg_snapshot(
+          FeIcebergTable.Utils.createTIcebergSnapshot(this));
+    }
+    return resp;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 0b6db14..e1524d6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -707,6 +707,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
     }
     return resp;
   }
+
   /**
    * @see FeCatalogUtils#parseColumnType(FieldSchema, String)
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
index 56133dd..2c1620a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
@@ -66,8 +66,8 @@ public class IcebergCatalogs implements IcebergCatalog {
   }
 
   public TIcebergCatalog getUnderlyingCatalogType(String catalogName) {
-    String catalogType = configuration_.get(catalogPropertyConfigKey(
-        catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE));
+    String catalogType = getCatalogProperty(
+        catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE);
     if (catalogType == null ||
         CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType)) {
       return TIcebergCatalog.HIVE_CATALOG;
@@ -91,8 +91,7 @@ public class IcebergCatalogs implements IcebergCatalog {
     setContextClassLoader();
     String catName = tableProps.get(IcebergTable.ICEBERG_CATALOG);
     Preconditions.checkState(catName != null);
-    String catalogType = configuration_.get(catalogPropertyConfigKey(
-      catName, CatalogUtil.ICEBERG_CATALOG_TYPE));
+    String catalogType = getCatalogProperty(catName, CatalogUtil.ICEBERG_CATALOG_TYPE);
     if (catalogType == null) {
       throw new ImpalaRuntimeException(
           String.format("Unknown catalog name: %s", catName));
@@ -101,6 +100,7 @@ public class IcebergCatalogs implements IcebergCatalog {
     properties.setProperty(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema));
     properties.setProperty(InputFormatConfig.PARTITION_SPEC,
         PartitionSpecParser.toJson(spec));
+    properties.setProperty("external.table.purge", "TRUE");
     return Catalogs.createTable(configuration_, properties);
   }
 
@@ -138,6 +138,15 @@ public class IcebergCatalogs implements IcebergCatalog {
         "Cannot rename Iceberg tables that use 'Catalogs'.");
   }
 
+  /**
+   * Returns the value of 'catalogPropertyKey' for the given catalog.
+   */
+  public String getCatalogProperty(String catalogName, String catalogPropertyKey) {
+    String propKey = String.format("%s%s.%s", InputFormatConfig.CATALOG_CONFIG_PREFIX,
+        catalogName, catalogPropertyKey);
+    return configuration_.get(propKey);
+  }
+
   private Properties createPropsForCatalogs(TableIdentifier tableId, String location,
       Map<String, String> tableProps) {
     Properties properties = new Properties();
@@ -147,15 +156,11 @@ public class IcebergCatalogs implements IcebergCatalog {
     } else if (location != null) {
       properties.setProperty(Catalogs.LOCATION, location);
     }
+    properties.setProperty(IcebergTable.ICEBERG_CATALOG,
+                           tableProps.get(IcebergTable.ICEBERG_CATALOG));
     return properties;
   }
 
-  private static String catalogPropertyConfigKey(String catalogName,
-      String catalogProperty) {
-    return String.format("%s%s.%s", InputFormatConfig.CATALOG_CONFIG_PREFIX,
-        catalogName, catalogProperty);
-  }
-
   /**
    * Some of the above methods might be running on native threads as they might be invoked
    * via JNI. In that case the context class loader for those threads are null. 'Catalogs'
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
index 5e122e9..e0c986f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Namespace;
@@ -46,6 +47,7 @@ import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.IcebergColumn;
 import org.apache.impala.catalog.IcebergStructField;
+import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.local.LocalDb;
 import org.apache.impala.catalog.local.LocalFsTable;
@@ -143,8 +145,15 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable
   private void setLocations() {
     Preconditions.checkState(msTable_ != null);
     Preconditions.checkState(icebergCatalog_ != null);
-    if (icebergCatalog_ == TIcebergCatalog.HADOOP_CATALOG) {
-      icebergCatalogLocation_ = IcebergUtil.getIcebergCatalogLocation(msTable_);
+    TIcebergCatalog underlyingCatalog = IcebergUtil.getUnderlyingCatalog(msTable_);
+    if (underlyingCatalog == TIcebergCatalog.HADOOP_CATALOG) {
+      if (icebergCatalog_ == TIcebergCatalog.CATALOGS) {
+        String catName = msTable_.getParameters().get(IcebergTable.ICEBERG_CATALOG);
+        icebergCatalogLocation_ = IcebergCatalogs.getInstance().getCatalogProperty(
+            catName, CatalogProperties.WAREHOUSE_LOCATION);
+      } else {
+        icebergCatalogLocation_ = IcebergUtil.getIcebergCatalogLocation(msTable_);
+      }
       TableIdentifier tId = IcebergUtil.getIcebergTableIdentifier(msTable_);
       Namespace ns = tId.namespace();
       List<String> components = new ArrayList<>();
@@ -155,7 +164,8 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable
       return;
     }
     Preconditions.checkState(icebergCatalog_ == TIcebergCatalog.HADOOP_TABLES ||
-                             icebergCatalog_ == TIcebergCatalog.HIVE_CATALOG);
+                             icebergCatalog_ == TIcebergCatalog.HIVE_CATALOG ||
+                             icebergCatalog_ == TIcebergCatalog.CATALOGS);
     icebergTableLocation_ = msTable_.getSd().getLocation();
     icebergCatalogLocation_ = icebergTableLocation_;
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
index 3841277..d115def 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
@@ -62,6 +62,7 @@ public class IcebergHiveCatalog implements IcebergCatalog {
       PartitionSpec spec,
       String location,
       Map<String, String> properties) {
+    properties.put("external.table.purge", "TRUE");
     return hiveCatalog_.createTable(identifier, schema, spec, location, properties);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index af198ec..5531e31 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -80,6 +80,7 @@ import org.apache.impala.thrift.TFunctionName;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
 import org.apache.impala.thrift.THdfsFileDesc;
+import org.apache.impala.thrift.TIcebergSnapshot;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartialPartitionInfo;
 import org.apache.impala.thrift.TTable;
@@ -1001,6 +1002,16 @@ public class CatalogdMetaProvider implements MetaProvider {
     return ret;
   }
 
+  @Override
+  public TIcebergSnapshot loadIcebergSnapshot(final TableMetaRef table)
+      throws TException {
+    Preconditions.checkArgument(table instanceof TableMetaRefImpl);
+    TGetPartialCatalogObjectRequest req = newReqForTable(table);
+    req.table_info_selector.want_iceberg_snapshot = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    return resp.table_info.getIceberg_snapshot();
+  }
+
   private ImmutableList<FileDescriptor> convertThriftFdList(List<THdfsFileDesc> thriftFds,
       List<TNetworkAddress> networkAddresses, ListMap<TNetworkAddress> hostIndex) {
     List<FileDescriptor> fds = Lists.newArrayListWithCapacity(thriftFds.size());
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 0745f56..001723f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -51,6 +51,7 @@ import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TBriefTableMeta;
+import org.apache.impala.thrift.TIcebergSnapshot;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TValidWriteIdList;
 import org.apache.impala.util.ListMap;
@@ -498,4 +499,11 @@ class DirectMetaProvider implements MetaProvider {
     throw new NotImplementedException(
         "getValidWriteIdList() is not implemented for DirectMetaProvider");
   }
+
+  @Override
+  public TIcebergSnapshot loadIcebergSnapshot(final TableMetaRef table)
+      throws TException {
+    throw new NotImplementedException(
+        "loadIcebergSnapshot() is not implemented for DirectMetaProvider");
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 4db65d6..4c4a49a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -42,10 +42,12 @@ import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
+import org.apache.impala.thrift.TIcebergSnapshot;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.IcebergSchemaConverter;
 import org.apache.impala.util.IcebergUtil;
+import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
 import com.google.errorprone.annotations.Immutable;
@@ -98,21 +100,21 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
       ColumnMap cmap, TableMetadata metadata)
       throws TableLoadingException {
     super(db, msTable, ref, cmap);
-    tableParams_ = new TableParams(msTable);
-    partitionSpecs_ = Utils.loadPartitionSpecByIceberg(metadata);
-    defaultPartitionSpecId_ = metadata.defaultSpecId();
     localFsTable_ = LocalFsTable.load(db, msTable, ref);
-    if (metadata.currentSnapshot() != null) {
-      snapshotId_ = metadata.currentSnapshot().snapshotId();
-    }
-    icebergSchema_ = metadata.schema();
+    tableParams_ = new TableParams(msTable);
+    TIcebergSnapshot tSnapshot;
     try {
-      pathHashToFileDescMap_ = Utils.loadAllPartition(this);
-    } catch (IOException e) {
+      tSnapshot = db_.getCatalog().getMetaProvider().loadIcebergSnapshot(ref);
+    } catch (TException e) {
       throw new TableLoadingException(String.format(
-          "Failed to load table: %s.%s", msTable.getDbName(), msTable.getTableName()),
-          (Exception)e);
+          "Failed to load table: %s.%s", msTable.getDbName(), msTable.getTableName()), e);
     }
+    snapshotId_ = tSnapshot.getSnapshot_id();
+    partitionSpecs_ = Utils.loadPartitionSpecByIceberg(metadata);
+    defaultPartitionSpecId_ = metadata.defaultSpecId();
+    icebergSchema_ = metadata.schema();
+    pathHashToFileDescMap_ = FeIcebergTable.Utils.loadFileDescMapFromThrift(
+        tSnapshot.getIceberg_file_desc_map());
     icebergFileFormat_ = IcebergUtil.getIcebergFileFormat(msTable);
     icebergParquetCompressionCodec_ = Utils.getIcebergParquetCompressionCodec(msTable);
     icebergParquetRowGroupSize_ = Utils.getIcebergParquetRowGroupSize(msTable);
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index fe94f51..146a6dd 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -36,6 +36,7 @@ import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.SqlConstraints;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TBriefTableMeta;
+import org.apache.impala.thrift.TIcebergSnapshot;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TValidWriteIdList;
 import org.apache.impala.util.ListMap;
@@ -124,6 +125,11 @@ public interface MetaProvider {
       List<String> colNames) throws TException;
 
   /**
+   * Loads Iceberg snapshot information, i.e. snapshot id and file descriptors.
+   */
+  public TIcebergSnapshot loadIcebergSnapshot(final TableMetaRef table) throws TException;
+
+  /**
    * Reference to a table as returned by loadTable(). This reference must be passed
    * back to other functions to fetch more details about the table. Implementations
    * may use this reference to store internal information such as version numbers
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 5cc4bfd..5728752 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Sets;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -213,6 +214,7 @@ import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TStatus;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.thrift.TTablePropertyType;
 import org.apache.impala.thrift.TTableRowFormat;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.thrift.TTestCaseData;
@@ -946,9 +948,9 @@ public class CatalogOpExecutor {
         return;
       } else if (tbl instanceof IcebergTable &&
           altersIcebergTable(params.getAlter_type())) {
-        alterIcebergTable(params, response, (IcebergTable) tbl, newCatalogVersion,
-            wantMinimalResult);
-        return;
+        boolean needToUpdateHms = alterIcebergTable(params, response, (IcebergTable)tbl,
+            newCatalogVersion, wantMinimalResult);
+        if (!needToUpdateHms) return;
       }
       switch (params.getAlter_type()) {
         case ADD_COLUMNS:
@@ -1216,41 +1218,59 @@ public class CatalogOpExecutor {
         || type == TAlterTableType.REPLACE_COLUMNS
         || type == TAlterTableType.DROP_COLUMN
         || type == TAlterTableType.ALTER_COLUMN
-        || type == TAlterTableType.SET_PARTITION_SPEC;
+        || type == TAlterTableType.SET_PARTITION_SPEC
+        || type == TAlterTableType.SET_TBL_PROPERTIES
+        || type == TAlterTableType.UNSET_TBL_PROPERTIES;
   }
 
   /**
-   * Executes the ALTER TABLE command for a Iceberg table and reloads its metadata.
+   * Executes the ALTER TABLE command for an Iceberg table and reloads its metadata.
    */
-  private void alterIcebergTable(TAlterTableParams params, TDdlExecResponse response,
+  private boolean alterIcebergTable(TAlterTableParams params, TDdlExecResponse response,
       IcebergTable tbl, long newCatalogVersion, boolean wantMinimalResult)
       throws ImpalaException {
     Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
+    boolean needsToUpdateHms = !isIcebergHmsIntegrationEnabled(tbl.getMetaStoreTable());
     try {
+      boolean needsTxn = true;
+      org.apache.iceberg.Transaction iceTxn = IcebergUtil.getIcebergTransaction(tbl);
       switch (params.getAlter_type()) {
         case ADD_COLUMNS:
           TAlterTableAddColsParams addColParams = params.getAdd_cols_params();
-          IcebergCatalogOpExecutor.addColumn(tbl, addColParams.getColumns());
+          IcebergCatalogOpExecutor.addColumns(iceTxn, addColParams.getColumns());
           addSummary(response, "Column(s) have been added.");
           break;
         case DROP_COLUMN:
           TAlterTableDropColParams dropColParams = params.getDrop_col_params();
-          IcebergCatalogOpExecutor.dropColumn(tbl, dropColParams.getCol_name());
+          IcebergCatalogOpExecutor.dropColumn(iceTxn, dropColParams.getCol_name());
           addSummary(response, "Column has been dropped.");
           break;
         case ALTER_COLUMN:
           TAlterTableAlterColParams alterColParams = params.getAlter_col_params();
-          IcebergCatalogOpExecutor.alterColumn(tbl, alterColParams.getCol_name(),
-             alterColParams.getNew_col_def());
+          IcebergCatalogOpExecutor.alterColumn(iceTxn, alterColParams.getCol_name(),
+               alterColParams.getNew_col_def());
           addSummary(response, "Column has been altered.");
           break;
         case SET_PARTITION_SPEC:
+          // Set partition spec uses 'TableOperations', not transactions.
+          needsTxn = false;
+          // Partition spec is not stored in HMS.
+          needsToUpdateHms = false;
           TAlterTableSetPartitionSpecParams setPartSpecParams =
               params.getSet_partition_spec_params();
           IcebergCatalogOpExecutor.alterTableSetPartitionSpec(tbl,
-              setPartSpecParams.getPartition_spec());
+              setPartSpecParams.getPartition_spec(),
+              catalog_.getCatalogServiceId(), newCatalogVersion);
           addSummary(response, "Updated partition spec.");
           break;
+        case SET_TBL_PROPERTIES:
+          needsToUpdateHms |= !setIcebergTblProperties(tbl, params, iceTxn);
+          addSummary(response, "Updated table.");
+          break;
+        case UNSET_TBL_PROPERTIES:
+          needsToUpdateHms |= !unsetIcebergTblProperties(tbl, params, iceTxn);
+          addSummary(response, "Updated table.");
+          break;
         case REPLACE_COLUMNS:
           // It doesn't make sense to replace all the columns of an Iceberg table as it
           // would basically make all existing data unaccessible.
@@ -1259,15 +1279,57 @@ public class CatalogOpExecutor {
               "Unsupported ALTER TABLE operation for Iceberg tables: " +
               params.getAlter_type());
       }
+      if (needsTxn) {
+        if (!needsToUpdateHms) {
+          IcebergCatalogOpExecutor.addCatalogVersionToTxn(iceTxn,
+              catalog_.getCatalogServiceId(), newCatalogVersion);
+        }
+        iceTxn.commitTransaction();
+      }
     } catch (IllegalArgumentException ex) {
       throw new ImpalaRuntimeException(String.format(
           "Failed to ALTER table '%s': %s", params.getTable_name().table_name,
           ex.getMessage()));
     }
 
-    loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER Iceberg TABLE " +
-        params.getAlter_type().name());
-    addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
+    if (!needsToUpdateHms) {
+      // We don't need to update HMS because either it is already done by Iceberg's
+      // HiveCatalog, or we modified the PARTITION SPEC which is not stored in HMS.
+      loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER Iceberg TABLE " +
+          params.getAlter_type().name());
+      catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
+      addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Sets table properties for an Iceberg table. Returns true on success, returns false
+   * if the operation is not applicable at the Iceberg table level, e.g. setting SERDE
+   * properties.
+   */
+  private boolean setIcebergTblProperties(IcebergTable tbl, TAlterTableParams params,
+      org.apache.iceberg.Transaction iceTxn) throws ImpalaException {
+    TAlterTableSetTblPropertiesParams setPropsParams =
+        params.getSet_tbl_properties_params();
+    if (setPropsParams.getTarget() != TTablePropertyType.TBL_PROPERTY) return false;
+    IcebergCatalogOpExecutor.setTblProperties(iceTxn, setPropsParams.getProperties());
+    return true;
+  }
+
+  /**
+   * Unsets table properties for an Iceberg table. Returns true on success, returns false
+   * if the operation is not applicable at the Iceberg table level, e.g. setting SERDE
+   * properties.
+   */
+  private boolean unsetIcebergTblProperties(IcebergTable tbl, TAlterTableParams params,
+      org.apache.iceberg.Transaction iceTxn) throws ImpalaException {
+    TAlterTableUnSetTblPropertiesParams unsetParams =
+        params.getUnset_tbl_properties_params();
+    if (unsetParams.getTarget() != TTablePropertyType.TBL_PROPERTY) return false;
+    IcebergCatalogOpExecutor.unsetTblProperties(iceTxn, unsetParams.getProperty_keys());
+    return true;
   }
 
   /**
@@ -2841,12 +2903,21 @@ public class CatalogOpExecutor {
     Preconditions.checkState(table instanceof FeIcebergTable);
     long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
     catalog_.getLock().writeLock().unlock();
-    FeIcebergTable iceTable = (FeIcebergTable)table;
+    addCatalogServiceIdentifiers(table, catalog_.getCatalogServiceId(),
+        newCatalogVersion);
+    FeIcebergTable iceTbl = (FeIcebergTable)table;
     if (params.isDelete_stats()) {
       dropColumnStats(table);
       dropTableStats(table);
     }
-    IcebergCatalogOpExecutor.truncateTable(iceTable);
+    org.apache.iceberg.Transaction iceTxn = IcebergUtil.getIcebergTransaction(iceTbl);
+    IcebergCatalogOpExecutor.truncateTable(iceTxn);
+    if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) {
+      catalog_.addVersionsForInflightEvents(false, table, newCatalogVersion);
+      IcebergCatalogOpExecutor.addCatalogVersionToTxn(iceTxn,
+          catalog_.getCatalogServiceId(), newCatalogVersion);
+    }
+    iceTxn.commitTransaction();
     return newCatalogVersion;
   }
 
@@ -3405,24 +3476,34 @@ public class CatalogOpExecutor {
                 .location();
             newTable.getSd().setLocation(tableLoc);
           } else {
-            if (location == null) {
-              if (IcebergUtil.getUnderlyingCatalog(newTable) !=
-                  TIcebergCatalog.HADOOP_TABLES) {
-                // When creating external Iceberg table we load
-                // the Iceberg table using catalog and table identifier to get
-                // the actual location of the table. This way we can also get the
-                // correct location for tables stored in nested namespaces.
-                TableIdentifier identifier =
-                    IcebergUtil.getIcebergTableIdentifier(newTable);
-                newTable.getSd().setLocation(IcebergUtil.loadTable(
-                    catalog, identifier,
-                    IcebergUtil.getIcebergCatalogLocation(newTable),
-                    newTable.getParameters()).location());
-              } else {
+            // If this is not a synchronized table, we assume that the table must be
+            // existing in an Iceberg Catalog.
+            TIcebergCatalog underlyingCatalog =
+                IcebergUtil.getUnderlyingCatalog(newTable);
+            String locationToLoadFrom;
+            if (underlyingCatalog == TIcebergCatalog.HADOOP_TABLES) {
+              if (location == null) {
                 addSummary(response,
                     "Location is necessary for external iceberg table.");
                 return false;
               }
+              locationToLoadFrom = location;
+            } else {
+              // For HadoopCatalog tables 'locationToLoadFrom' is the location of the
+              // hadoop catalog. For HiveCatalog tables it remains null.
+              locationToLoadFrom = IcebergUtil.getIcebergCatalogLocation(newTable);
+            }
+            TableIdentifier identifier = IcebergUtil.getIcebergTableIdentifier(newTable);
+            org.apache.iceberg.Table iceTable = IcebergUtil.loadTable(
+                catalog, identifier, locationToLoadFrom, newTable.getParameters());
+            // Populate the HMS table schema based on the Iceberg table's schema because
+            // the Iceberg metadata is the source of truth. This also avoids an
+            // unnecessary ALTER TABLE.
+            IcebergCatalogOpExecutor.populateExternalTableCols(newTable, iceTable);
+            if (location == null) {
+              // Using the location of the loaded Iceberg table we can also get the
+              // correct location for tables stored in nested namespaces.
+              newTable.getSd().setLocation(iceTable.location());
             }
           }
 
@@ -3433,20 +3514,6 @@ public class CatalogOpExecutor {
               newTable.getPartitionKeys().isEmpty());
           if (!isIcebergHmsIntegrationEnabled(newTable)) {
             msClient.getHiveClient().createTable(newTable);
-          } else {
-            // Currently HiveCatalog doesn't set the table property
-            // 'external.table.purge' during createTable().
-            org.apache.hadoop.hive.metastore.api.Table msTbl =
-                msClient.getHiveClient().getTable(
-                    newTable.getDbName(), newTable.getTableName());
-            msTbl.putToParameters("external.table.purge", "TRUE");
-            // HiveCatalog also doesn't set the table properties either.
-            for (Map.Entry<String, String> entry :
-                params.getTable_properties().entrySet()) {
-              msTbl.putToParameters(entry.getKey(), entry.getValue());
-            }
-            msClient.getHiveClient().alter_table(
-                newTable.getDbName(), newTable.getTableName(), msTbl);
           }
           events = getNextMetastoreEventsIfEnabled(eventId, event ->
               CreateTableEvent.CREATE_TABLE_EVENT_TYPE.equals(event.getEventType())
@@ -6002,8 +6069,19 @@ public class CatalogOpExecutor {
       }
 
       if (table instanceof FeIcebergTable && update.isSetIceberg_operation()) {
-        IcebergCatalogOpExecutor.appendFiles((FeIcebergTable)table,
+        FeIcebergTable iceTbl = (FeIcebergTable)table;
+        org.apache.iceberg.Transaction iceTxn = IcebergUtil.getIcebergTransaction(iceTbl);
+        IcebergCatalogOpExecutor.appendFiles(iceTbl, iceTxn,
             update.getIceberg_operation());
+        if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) {
+          // Add catalog service id and the 'newCatalogVersion' to the table parameters.
+          // This way we can avoid reloading the table on self-events (Iceberg generates
+          // an ALTER TABLE statement to set the new metadata_location).
+          IcebergCatalogOpExecutor.addCatalogVersionToTxn(iceTxn,
+              catalog_.getCatalogServiceId(), newCatalogVersion);
+          catalog_.addVersionsForInflightEvents(false, table, newCatalogVersion);
+        }
+        iceTxn.commitTransaction();
       }
 
       loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata,
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 33a47df..5de7ef4 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -35,12 +35,15 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.TableLoadingException;
@@ -80,12 +83,23 @@ public class IcebergCatalogOpExecutor {
         params.getPartition_spec());
     IcebergCatalog icebergCatalog = IcebergUtil.getIcebergCatalog(catalog, location);
     Table iceTable = icebergCatalog.createTable(identifier, schema, spec, location,
-        excludeHmsOnlyProps(params.getTable_properties()));
+        params.getTable_properties());
     LOG.info("Create iceberg table successful.");
     return iceTable;
   }
 
   /**
+   * Populates HMS table schema based on the Iceberg table's schema.
+   */
+  public static void populateExternalTableCols(
+      org.apache.hadoop.hive.metastore.api.Table msTbl, Table iceTbl)
+      throws TableLoadingException {
+    TableMetadata metadata = ((BaseTable)iceTbl).operations().current();
+    Schema schema = metadata.schema();
+    msTbl.getSd().setCols(IcebergSchemaConverter.convertToHiveSchema(schema));
+  }
+
+  /**
    * Drops Iceberg table from Iceberg's catalog.
    * Throws TableNotFoundException if table is not found and 'ifExists' is false.
    */
@@ -107,9 +121,9 @@ public class IcebergCatalogOpExecutor {
   /**
    * Adds a column to an existing Iceberg table.
    */
-  public static void addColumn(FeIcebergTable feTable, List<TColumn> columns)
+  public static void addColumns(Transaction txn, List<TColumn> columns)
       throws TableLoadingException, ImpalaRuntimeException {
-    UpdateSchema schema = IcebergUtil.getIcebergUpdateSchema(feTable);
+    UpdateSchema schema = txn.updateSchema();
     for (TColumn column : columns) {
       org.apache.iceberg.types.Type type =
           IcebergSchemaConverter.fromImpalaColumnType(column.getColumnType());
@@ -125,9 +139,9 @@ public class IcebergCatalogOpExecutor {
    *   FLOAT -> DOUBLE
    *   DECIMAL(p1,s1) -> DECIMAL(p1,s2), same scale, p1<=p2
    */
-  public static void alterColumn(FeIcebergTable feTable, String colName, TColumn newCol)
+  public static void alterColumn(Transaction txn, String colName, TColumn newCol)
       throws TableLoadingException, ImpalaRuntimeException {
-    UpdateSchema schema = IcebergUtil.getIcebergUpdateSchema(feTable);
+    UpdateSchema schema = txn.updateSchema();
     org.apache.iceberg.types.Type type =
         IcebergSchemaConverter.fromImpalaColumnType(newCol.getColumnType());
     // Cannot change a column to complex type
@@ -150,8 +164,8 @@ public class IcebergCatalogOpExecutor {
    * Sets new default partition spec for an Iceberg table.
    */
   public static void alterTableSetPartitionSpec(FeIcebergTable feTable,
-      TIcebergPartitionSpec partSpec) throws TableLoadingException,
-                                             ImpalaRuntimeException {
+      TIcebergPartitionSpec partSpec, String catalogServiceId, long catalogVersion)
+      throws TableLoadingException, ImpalaRuntimeException {
     BaseTable iceTable = (BaseTable)IcebergUtil.loadTable(feTable);
     TableOperations tableOp = iceTable.operations();
     TableMetadata metadata = tableOp.current();
@@ -159,16 +173,21 @@ public class IcebergCatalogOpExecutor {
     Schema schema = metadata.schema();
     PartitionSpec newPartSpec = IcebergUtil.createIcebergPartition(schema, partSpec);
     TableMetadata newMetadata = metadata.updatePartitionSpec(newPartSpec);
-
+    Map<String, String> properties = new HashMap<>(newMetadata.properties());
+    properties.put(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
+                   catalogServiceId);
+    properties.put(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
+                   String.valueOf(catalogVersion));
+    newMetadata = newMetadata.replaceProperties(properties);
     tableOp.commit(metadata, newMetadata);
   }
 
   /**
    * Drops a column from a Iceberg table.
    */
-  public static void dropColumn(FeIcebergTable feTable, String colName)
+  public static void dropColumn(Transaction txn, String colName)
       throws TableLoadingException, ImpalaRuntimeException {
-    UpdateSchema schema = IcebergUtil.getIcebergUpdateSchema(feTable);
+    UpdateSchema schema = txn.updateSchema();
     schema.deleteColumn(colName);
     schema.commit();
   }
@@ -177,34 +196,31 @@ public class IcebergCatalogOpExecutor {
    * Rename Iceberg table
    */
   public static void renameTable(FeIcebergTable feTable, TableIdentifier tableId)
-      throws ImpalaRuntimeException{
+      throws ImpalaRuntimeException {
     IcebergCatalog catalog = IcebergUtil.getIcebergCatalog(feTable);
     catalog.renameTable(feTable, tableId);
   }
 
   /**
-   * Returns a new Map without the properties that only need to be stored at the
-   * HMS level, not at the Iceberg table level.
+   * Set TBLPROPERTIES.
    */
-  private static Map<String, String> excludeHmsOnlyProps(Map<String, String> props) {
-    Map<String, String> ret = new HashMap<>();
-    for (Map.Entry<String, String> entry : props.entrySet()) {
-      if (isHmsOnlyProperty(entry.getKey())) continue;
-      ret.put(entry.getKey(), entry.getValue());
+  public static void setTblProperties(Transaction txn, Map<String, String> properties) {
+    UpdateProperties updateProps = txn.updateProperties();
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      updateProps.set(entry.getKey(), entry.getValue());
     }
-    return ret;
+    updateProps.commit();
   }
 
   /**
-   * Returns true if the table property should only be stored in HMS.
-   * If false, the property is stored in HMS as well as iceberg.
+   * Unset TBLPROPERTIES
    */
-  private static boolean isHmsOnlyProperty(String propKey) {
-    if (IcebergTable.ICEBERG_FILE_FORMAT.equals(propKey)) return true;
-    if (IcebergTable.ICEBERG_CATALOG_LOCATION.equals(propKey)) return true;
-    if (IcebergTable.ICEBERG_TABLE_IDENTIFIER.equals(propKey)) return true;
-    if (CatalogOpExecutor.CAPABILITIES_KEY.equals(propKey)) return true;
-    return false;
+  public static void unsetTblProperties(Transaction txn, List<String> removeProperties) {
+    UpdateProperties updateProps = txn.updateProperties();
+    for (String prop : removeProperties) {
+      updateProps.remove(prop);
+    }
+    updateProps.commit();
   }
 
   /**
@@ -225,8 +241,8 @@ public class IcebergCatalogOpExecutor {
 
   private static class Append implements BatchWrite {
     final private AppendFiles append;
-    public Append(org.apache.iceberg.Table tbl) {
-      append = tbl.newAppend();
+    public Append(Transaction txn) {
+      append = txn.newAppend();
     }
 
     @Override
@@ -242,8 +258,8 @@ public class IcebergCatalogOpExecutor {
 
   private static class DynamicOverwrite implements BatchWrite {
     final private ReplacePartitions replace;
-    public DynamicOverwrite(org.apache.iceberg.Table tbl) {
-      replace = tbl.newReplacePartitions();
+    public DynamicOverwrite(Transaction txn) {
+      replace = txn.newReplacePartitions();
     }
 
     @Override
@@ -261,7 +277,7 @@ public class IcebergCatalogOpExecutor {
    * Append the newly inserted data files to the Iceberg table using the AppendFiles
    * API.
    */
-  public static void appendFiles(FeIcebergTable feIcebergTable,
+  public static void appendFiles(FeIcebergTable feIcebergTable, Transaction txn,
       TIcebergOperationParam icebergOp) throws ImpalaRuntimeException,
       TableLoadingException {
     org.apache.iceberg.Table nativeIcebergTable =
@@ -269,9 +285,9 @@ public class IcebergCatalogOpExecutor {
     List<ByteBuffer> dataFilesFb = icebergOp.getIceberg_data_files_fb();
     BatchWrite batchWrite;
     if (icebergOp.isIs_overwrite()) {
-      batchWrite = new DynamicOverwrite(nativeIcebergTable);
+      batchWrite = new DynamicOverwrite(txn);
     } else {
-      batchWrite = new Append(nativeIcebergTable);
+      batchWrite = new Append(txn);
     }
     for (ByteBuffer buf : dataFilesFb) {
       FbIcebergDataFile dataFile = FbIcebergDataFile.getRootAsFbIcebergDataFile(buf);
@@ -323,11 +339,24 @@ public class IcebergCatalogOpExecutor {
   /**
    * Creates new snapshot for the iceberg table by deleting all data files.
    */
-  public static void truncateTable(FeIcebergTable feIceTable)
-      throws ImpalaRuntimeException, TableLoadingException {
-    Table iceTable = IcebergUtil.loadTable(feIceTable);
-    DeleteFiles delete = iceTable.newDelete();
+  public static void truncateTable(Transaction txn)
+      throws ImpalaRuntimeException {
+    DeleteFiles delete = txn.newDelete();
     delete.deleteFromRowFilter(Expressions.alwaysTrue());
     delete.commit();
   }
+
+  /**
+   * Sets catalog service id and the new catalog version in table properties using 'txn'.
+   * This way we can avoid reloading the table on self-events.
+   */
+  public static void addCatalogVersionToTxn(Transaction txn, String serviceId,
+      long version) {
+    UpdateProperties updateProps = txn.updateProperties();
+    updateProps.set(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
+                    serviceId);
+    updateProps.set(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
+                    String.valueOf(version));
+    updateProps.commit();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 8bca3a6..364430f 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -41,7 +41,7 @@ import com.google.common.primitives.Longs;
 import org.apache.impala.catalog.IcebergStructField;
 import org.apache.impala.common.Pair;
 import org.apache.iceberg.BaseTable;
-import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.Transaction;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
@@ -186,12 +186,12 @@ public class IcebergUtil {
   }
 
   /**
-   * Get Iceberg UpdateSchema from 'feTable', usually use UpdateSchema to update Iceberg
+   * Get Iceberg Transaction for 'feTable', usually use Transaction to update Iceberg
    * table schema.
    */
-  public static UpdateSchema getIcebergUpdateSchema(FeIcebergTable feTable)
+  public static Transaction getIcebergTransaction(FeIcebergTable feTable)
       throws TableLoadingException, ImpalaRuntimeException {
-    return getIcebergCatalog(feTable).loadTable(feTable).updateSchema();
+    return getIcebergCatalog(feTable).loadTable(feTable).newTransaction();
   }
 
   /**
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test
index 3df404e..6da98c7 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test
@@ -104,7 +104,6 @@ DESCRIBE FORMATTED iceberg_hive_catalogs;
 ---- RESULTS: VERIFY_IS_SUBSET
 'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/iceberg_hive_catalogs','NULL'
 '','write.format.default','parquet             '
-'','iceberg.catalog     ','ice_hive_cat        '
 ---- TYPES
 string, string, string
 ====
@@ -136,7 +135,6 @@ DESCRIBE FORMATTED iceberg_hive_catalogs_ext;
 ---- RESULTS: VERIFY_IS_SUBSET
 'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/iceberg_hive_catalogs','NULL'
 '','write.format.default','parquet             '
-'','iceberg.catalog     ','ice_hive_cat        '
 '','iceberg.table_identifier','$DATABASE.iceberg_hive_catalogs'
 '','name                ','$DATABASE.iceberg_hive_catalogs'
 ---- TYPES
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index b6edbfd..dfe435e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -911,7 +911,7 @@ PARTITIONED BY SPEC (BUCKET(3, i))
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
 TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
-'engine.hive.enabled'='true', 'iceberg.catalog'='ice_hive_cat', 'table_type'='ICEBERG')
+'engine.hive.enabled'='true', 'table_type'='ICEBERG')
 ====
 ---- CREATE_TABLE
 CREATE TABLE iceberg_catalogs_hadoop (i int)
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index c90275e..1df40b5 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -325,6 +325,64 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     # 24 partitions inserted and hence we must refresh 24 partitions once.
     assert int(partitions_refreshed_after_hive) == int(partitions_refreshed_insert) + 24
 
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
+  def test_iceberg_self_events(self, unique_database):
+    """This test checks that Impala doesn't refresh Iceberg tables on self events."""
+    tbl_name = unique_database + ".test_iceberg_events"
+
+    def check_self_events(query, skips_events=True):
+      tbls_refreshed_before, partitions_refreshed_before, \
+          events_skipped_before = self.__get_self_event_metrics()
+      self.client.execute(query)
+      EventProcessorUtils.wait_for_event_processing(self)
+      tbls_refreshed_after, partitions_refreshed_after, \
+          events_skipped_after = self.__get_self_event_metrics()
+      assert tbls_refreshed_before == tbls_refreshed_after
+      assert partitions_refreshed_before == partitions_refreshed_after
+      if skips_events:
+        assert events_skipped_after > events_skipped_before
+
+    hadoop_tables = "'iceberg.catalog'='hadoop.tables'"
+    hadoop_catalog = ("'iceberg.catalog'='hadoop.catalog', " +
+        "'iceberg.catalog_location'='/test-warehouse/{0}/hadoop_catalog_test/'".format(
+            unique_database))
+    hive_catalog = "'iceberg.catalog'='hive.catalog'"
+    hive_catalogs = "'iceberg.catalog'='ice_hive_cat'"
+    hadoop_catalogs = "'iceberg.catalog'='ice_hadoop_cat'"
+
+    all_catalogs = [hadoop_tables, hadoop_catalog, hive_catalog, hive_catalogs,
+        hadoop_catalogs]
+
+    for catalog in all_catalogs:
+      is_hive_catalog = catalog == hive_catalog or catalog == hive_catalogs
+      self.client.execute("""
+          CREATE TABLE {0} (i int) STORED AS ICEBERG
+          TBLPROPERTIES ({1})""".format(tbl_name, catalog))
+
+      check_self_events("ALTER TABLE {0} ADD COLUMN j INT".format(tbl_name))
+      check_self_events("ALTER TABLE {0} DROP COLUMN i".format(tbl_name))
+      check_self_events("ALTER TABLE {0} CHANGE COLUMN j j BIGINT".format(tbl_name))
+      # SET PARTITION SPEC only updates HMS in case of HiveCatalog (which sets
+      # table property 'metadata_location')
+      check_self_events(
+          "ALTER TABLE {0} SET PARTITION SPEC (truncate(2, j))".format(tbl_name),
+          skips_events=is_hive_catalog)
+      check_self_events(
+          "ALTER TABLE {0} SET TBLPROPERTIES('key'='value')".format(tbl_name))
+      check_self_events("ALTER TABLE {0} UNSET TBLPROPERTIES('key')".format(tbl_name))
+      check_self_events("INSERT INTO {0} VALUES (1), (2), (3)".format(tbl_name),
+          skips_events=is_hive_catalog)
+      check_self_events("INSERT OVERWRITE {0} VALUES (4), (5), (6)".format(tbl_name),
+          skips_events=is_hive_catalog)
+      ctas_tbl = unique_database + ".ice_ctas"
+      check_self_events("""CREATE TABLE {0} STORED AS ICEBERG
+          TBLPROPERTIES ({1}) AS SELECT * FROM {2}""".format(ctas_tbl, catalog, tbl_name))
+      check_self_events("DROP TABLE {0}".format(ctas_tbl))
+      check_self_events("TRUNCATE TABLE {0}".format(tbl_name),
+          skips_events=is_hive_catalog)
+
+      self.client.execute("DROP TABLE {0}".format(tbl_name))
+
   def __run_self_events_test(self, db_name, use_impala):
     recover_tbl_name = ImpalaTestSuite.get_random_name("tbl_")
     # create a table similar to alltypes so that we can recover the partitions on it
diff --git a/tests/metadata/test_show_create_table.py b/tests/metadata/test_show_create_table.py
index 8e82974..07844c8 100644
--- a/tests/metadata/test_show_create_table.py
+++ b/tests/metadata/test_show_create_table.py
@@ -39,7 +39,9 @@ class TestShowCreateTable(ImpalaTestSuite):
                            "STATS_GENERATED_VIA_STATS_TASK", "last_modified_by",
                            "last_modified_time", "numFilesErasureCoded",
                            "bucketing_version", "OBJCAPABILITIES",
-                           "TRANSLATED_TO_EXTERNAL", "previous_metadata_location"]
+                           "TRANSLATED_TO_EXTERNAL", "previous_metadata_location",
+                           "impala.events.catalogServiceId",
+                           "impala.events.catalogVersion"]
 
   @classmethod
   def get_workload(self):
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index ca75ba0..c4e23f0 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -460,3 +460,26 @@ class TestIcebergTable(ImpalaTestSuite):
       pytest.skip('runs only in exhaustive')
     self.run_test_case('QueryTest/iceberg-write-many-files-stress', vector,
         use_db=unique_database)
+
+  def test_consistent_scheduling(self, vector, unique_database):
+    """IMPALA-10914: This test verifies that Impala schedules scan ranges consistently for
+    Iceberg tables."""
+    def collect_split_stats(profile):
+      splits = [l.strip() for l in profile.splitlines() if "Hdfs split stats" in l]
+      splits.sort()
+      return splits
+
+    with self.create_impala_client() as impalad_client:
+      impalad_client.execute("use " + unique_database)
+      impalad_client.execute("""create table line_ice stored as iceberg
+                                as select * from tpch_parquet.lineitem""")
+      first_result = impalad_client.execute("""select count(*) from line_ice""")
+      ref_profile = first_result.runtime_profile
+      ref_split_stats = collect_split_stats(ref_profile)
+
+      for i in range(0, 10):
+        # Subsequent executions of the same query should schedule scan ranges similarly.
+        result = impalad_client.execute("""select count(*) from line_ice""")
+        profile = result.runtime_profile
+        split_stats = collect_split_stats(profile)
+        assert ref_split_stats == split_stats
diff --git a/tests/stress/test_insert_stress.py b/tests/stress/test_insert_stress.py
index fe43a38..5caf3db 100644
--- a/tests/stress/test_insert_stress.py
+++ b/tests/stress/test_insert_stress.py
@@ -48,9 +48,16 @@ class TestInsertStress(ImpalaTestSuite):
     try:
       insert_cnt = 0
       while insert_cnt < num_inserts:
-        impalad_client.execute("insert into table %s values (%i, %i)" % (
-            tbl_name, wid, insert_cnt))
-        insert_cnt += 1
+        try:
+          impalad_client.execute("insert into table %s values (%i, %i)" % (
+              tbl_name, wid, insert_cnt))
+          insert_cnt += 1
+        except Exception as e:
+          # It's possible that the Iceberg table is concurrently updated in CatalogD
+          # during data load in local catalog.
+          if "InconsistentMetadataFetchException" in str(e):
+            continue
+          raise e
     finally:
       with counter.get_lock():
         counter.value += 1
@@ -72,9 +79,16 @@ class TestInsertStress(ImpalaTestSuite):
     impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
     try:
       while counter.value != writers:
-        result = impalad_client.execute("select * from %s" % tbl_name)
-        verify_result_set(result)
-        time.sleep(random.random())
+        try:
+          result = impalad_client.execute("select * from %s" % tbl_name)
+          verify_result_set(result)
+          time.sleep(random.random())
+        except Exception as e:
+          # It's possible that the Iceberg table is concurrently updated in CatalogD
+          # during data load in local catalog.
+          if "InconsistentMetadataFetchException" in str(e):
+            continue
+          raise e
     finally:
       impalad_client.close()
 

[impala] 04/04: IMPALA-10942: Fix memory leak in admission controller

Posted by bi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bikram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cee2b42781658f42dc08a41cf88bb2ed2803aaf1
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Thu Sep 30 19:03:15 2021 -0700

    IMPALA-10942: Fix memory leak in admission controller
    
    This patch fixes a memory leak in the admission controller where
    objects tracking state required for queuing were being retained
    despite the query being admitted or rejected immediately.
    
    Change-Id: I1df0de0e658f6dcb5a044d108d0943aaa3dea0a1
    Reviewed-on: http://gerrit.cloudera.org:8080/17893
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/admission-control-service.cc      | 2 +-
 be/src/scheduling/admission-controller.cc           | 6 +++---
 be/src/scheduling/admission-controller.h            | 2 +-
 be/src/scheduling/local-admission-control-client.cc | 2 +-
 4 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/be/src/scheduling/admission-control-service.cc b/be/src/scheduling/admission-control-service.cc
index de034d0..374da30 100644
--- a/be/src/scheduling/admission-control-service.cc
+++ b/be/src/scheduling/admission-control-service.cc
@@ -328,7 +328,7 @@ void AdmissionControlService::AdmitFromThreadPool(UniqueIdPB query_id) {
         admission_state->blacklisted_executor_addresses};
     admission_state->admit_status =
         AdmissiondEnv::GetInstance()->admission_controller()->SubmitForAdmission(request,
-            &admission_state->admit_outcome, &admission_state->schedule, &queued,
+            &admission_state->admit_outcome, &admission_state->schedule, queued,
             &admission_state->request_pool);
     admission_state->submitted = true;
     if (!queued) {
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index da79c07..432f319 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -1165,9 +1165,9 @@ void AdmissionController::PoolStats::UpdateConfigMetrics(const TPoolConfig& pool
 
 Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
     Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,
-    unique_ptr<QuerySchedulePB>* schedule_result, bool* queued,
+    unique_ptr<QuerySchedulePB>* schedule_result, bool& queued,
     std::string* request_pool) {
-  *queued = false;
+  queued = false;
   DebugActionNoFail(request.query_options, "AC_BEFORE_ADMISSION");
   DCHECK(schedule_result->get() == nullptr);
 
@@ -1281,7 +1281,7 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
       PROFILE_INFO_KEY_INITIAL_QUEUE_REASON, queue_node->initial_queue_reason);
 
   queue_node->wait_start_ms = MonotonicMillis();
-  *queued = true;
+  queued = true;
   return Status::OK();
 }
 
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index e52a769..fec28d5 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -369,7 +369,7 @@ class AdmissionController {
   /// cancelled to ensure that the pool statistics are updated.
   Status SubmitForAdmission(const AdmissionRequest& request,
       Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,
-      std::unique_ptr<QuerySchedulePB>* schedule_result, bool* queued,
+      std::unique_ptr<QuerySchedulePB>* schedule_result, bool& queued,
       std::string* request_pool = nullptr);
 
   /// After SubmitForAdmission(), if the query was queued this must be called. If
diff --git a/be/src/scheduling/local-admission-control-client.cc b/be/src/scheduling/local-admission-control-client.cc
index faf47a5..e85f34b 100644
--- a/be/src/scheduling/local-admission-control-client.cc
+++ b/be/src/scheduling/local-admission-control-client.cc
@@ -38,7 +38,7 @@ Status LocalAdmissionControlClient::SubmitForAdmission(
   query_events->MarkEvent(QUERY_EVENT_SUBMIT_FOR_ADMISSION);
   bool queued;
   Status status = ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission(
-      request, &admit_outcome_, schedule_result, &queued);
+      request, &admit_outcome_, schedule_result, queued);
   if (queued) {
     query_events->MarkEvent(QUERY_EVENT_QUEUED);
     DCHECK(status.ok());

[impala] 03/04: IMPALA-10862 Optimization of the code structure of TmpDir

Posted by bi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bikram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5e3d4391906b09adb97ba6d036811aaf81f5a403
Author: Yida Wu <wy...@gmail.com>
AuthorDate: Fri Jul 23 05:51:31 2021 -0700

    IMPALA-10862 Optimization of the code structure of TmpDir
    
    Currently the logic of initialization of TmpFileMgr is a bit tedious.
    
    This patch simplifies TmpFileMgr::InitCustom() by refactoring parsing
    and validation logic.
    
    The patch adds TmpDirLocal, TmpDirHdfs and TmpDirS3 to inherit TmpDir
    to implement their own logic to parse and validate. It enables easier
    addition of custom logic for future filesystems.
    
    All changes only affect the interfaces within the TmpFileMgr module,
    and the main logic of the scratch directory parsing and verification
    doesn't change.
    
    Tests:
    Ran the Core tests and exhaustive e2e tests.
    
    Because the current testcases of TmpFileMgrTest already cover the
    TmpDir parsing and verification, no testcases may need to be added
    for this structure optimization. Added some S3 directory parsing
    failure testcases.
    
    Change-Id: I52971238d5841a1cdfee06b38750f9dc99a6a2be
    Reviewed-on: http://gerrit.cloudera.org:8080/17778
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/tmp-file-mgr-internal.h | 114 +++++++-
 be/src/runtime/tmp-file-mgr-test.cc    | 130 ++++----
 be/src/runtime/tmp-file-mgr.cc         | 521 ++++++++++++++++++---------------
 be/src/runtime/tmp-file-mgr.h          |  50 +---
 4 files changed, 487 insertions(+), 328 deletions(-)

diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h
index 1827e4f..cbdebec 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -27,6 +27,7 @@
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/io/file-writer.h"
 #include "runtime/tmp-file-mgr.h"
+#include "util/hdfs-util.h"
 
 namespace impala {
 
@@ -94,7 +95,7 @@ class TmpFile {
   std::string DebugString();
 
   /// Helper to get the TmpDir that this file is associated with.
-  TmpFileMgr::TmpDir* GetDir();
+  TmpDir* GetDir();
 
   /// Helper to get the TmpFileGroup that this file is associated with.
   TmpFileGroup* FileGroup() const { return file_group_; }
@@ -213,7 +214,7 @@ class TmpFileRemote : public TmpFile {
 
   bool AllocateSpace(int64_t num_bytes, int64_t* offset);
   io::DiskFile* GetWriteFile();
-  TmpFileMgr::TmpDir* GetLocalBufferDir() const;
+  TmpDir* GetLocalBufferDir() const;
   Status Remove();
 
   /// Returns the size of the file.
@@ -308,6 +309,115 @@ class TmpFileDummy : public TmpFile {
   Status Remove() { return Status::OK(); }
 };
 
+/// A configured temporary directory that TmpFileMgr allocates files in.
+class TmpDir {
+ public:
+  TmpDir(const std::string& raw_path, const std::string& prefix, bool is_local);
+  virtual ~TmpDir() {}
+
+  /// Parse the raw path and identify the scratch directory options.
+  virtual Status Parse();
+
+  /// Verify the scratch path and create the directory.
+  virtual Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk,
+      bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) = 0;
+
+  int64_t bytes_limit() { return bytes_limit_; }
+  int priority() { return priority_; }
+  const string& path() { return path_; }
+  IntGauge* bytes_used_metric() const { return bytes_used_metric_; }
+  bool is_local() { return is_local_dir_; }
+
+ private:
+  friend class TmpFileMgr;
+  friend class TmpDirHdfs;
+  friend class TmpDirS3;
+  friend class TmpDirLocal;
+
+  /// Raw path of the temporary directory.
+  const std::string raw_path_;
+
+  /// Parsed raw path of the temporary directory, e.g, trimmed.
+  std::string parsed_raw_path_;
+
+  /// The prefix of the path.
+  std::string prefix_;
+
+  /// The complete path to the temporary directory.
+  std::string path_;
+
+  /// Limit on bytes that should be written to this path. Set to maximum value
+  /// of int64_t if there is no limit.
+  int64_t bytes_limit_;
+
+  /// Scratch directory priority.
+  int priority_;
+
+  /// The current bytes of scratch used for this temporary directory.
+  IntGauge* bytes_used_metric_;
+
+  /// If the dir is expected in the local file system or in the remote.
+  const bool is_local_dir_;
+
+  /// Indicate if the TmpDir is parsed.
+  bool is_parsed_;
+
+  /// Return the directory path by parsing the input tokens.
+  /// "Path" is the path generated from the tokens.
+  /// "Offset" indicates the number of elements has been read in the tokens.
+  virtual Status GetPathFromToks(
+      const std::vector<string>& tokens, string* path, int* offset);
+
+  /// A helper function for ParseTokens() to parse the raw path and generate the complete
+  /// path of the scratch directory.
+  /// "Offset" indicates the number of elements has been read in the tokens.
+  Status ParsePath(const std::vector<string>& tokens, int* offset);
+
+  /// A helper function for ParseTokens() to parse the byte limit of the scratch
+  /// directory. "Index" indicates the position of the byte_limit in the tokens.
+  Status ParseByteLimit(const std::vector<string>& tokens, int index);
+
+  /// A helper function for ParseTokens() to parse the priorify of the scratch directory.
+  /// "Index" indicates the position of the priority in the tokens.
+  Status ParsePriority(const std::vector<string>& tokens, int index);
+
+  /// A helper function for Parse() to parse raw input of the scratch directory.
+  Status ParseTokens();
+};
+
+class TmpDirLocal : public TmpDir {
+ public:
+  TmpDirLocal(const std::string& path) : TmpDir(path, "", true /*is_local*/) {}
+  Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk,
+      bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override;
+
+ private:
+  /// A helper function for VerifyAndCreate() to create a local scratch directory.
+  Status CreateLocalDirectory(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk,
+      bool need_local_buffer_dir, int disk_id, TmpFileMgr* tmp_mgr);
+};
+
+class TmpDirS3 : public TmpDir {
+ public:
+  TmpDirS3(const std::string& path)
+    : TmpDir(path, FILESYS_PREFIX_S3, false /*is_local*/) {}
+  Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk,
+      bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override;
+};
+
+class TmpDirHdfs : public TmpDir {
+ public:
+  TmpDirHdfs(const std::string& path)
+    : TmpDir(path, FILESYS_PREFIX_HDFS, false /*is_local*/) {}
+  Status Parse() override;
+  Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk,
+      bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override;
+
+ private:
+  virtual Status GetPathFromToks(
+      const std::vector<string>& tokens, string* path, int* offset) override;
+};
+
 /// Temporary file buffer pool allows the temporary files to return their buffer to the
 /// pool and can be evicted to make room for other files. The pool also provides an async
 /// way for the write ranges to wait until there is an available space to reserve before
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index fcc4ea8..57cd9a7 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -171,13 +171,13 @@ class TmpFileMgrTest : public ::testing::Test {
   }
 
   /// Helper to get the private tmp_dirs_ member.
-  static const vector<TmpFileMgr::TmpDir>& GetTmpDirs(TmpFileMgr* mgr) {
+  static const vector<unique_ptr<TmpDir>>& GetTmpDirs(TmpFileMgr* mgr) {
     return mgr->tmp_dirs_;
   }
 
   /// Helper to get the private tmp_remote_dirs_ pointer.
-  static const TmpFileMgr::TmpDir* GetTmpRemoteDir(TmpFileMgr* mgr) {
-    return mgr->tmp_dirs_remote_.get();
+  static const unique_ptr<TmpDir>& GetTmpRemoteDir(TmpFileMgr* mgr) {
+    return mgr->tmp_dirs_remote_;
   }
 
   /// Helper to call the private TmpFileMgr::NewFile() method.
@@ -211,7 +211,7 @@ class TmpFileMgrTest : public ::testing::Test {
   /// Helper to set an invalid remote path to create an error.
   static void SetInvalidRemotePath(
       TmpFileMgr* tmp_file_mgr, TmpFileGroup* group, int tmp_file_idx) {
-    string dir = tmp_file_mgr->tmp_dirs_remote_->path;
+    const string& dir = tmp_file_mgr->tmp_dirs_remote_->path();
     int dev_id = 0;
     string invalid_path = "";
     auto tmp_file_shared_ptr = group->tmp_files_remote_[tmp_file_idx];
@@ -968,12 +968,12 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) {
                        "/tmp/tmp-file-mgr-test5:200tb:5,"
                        "/tmp/tmp-file-mgr-test6:100MB:6"));
   EXPECT_EQ(6, dirs.size());
-  EXPECT_EQ(5 * GIGABYTE, dirs[0].bytes_limit);
-  EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1].bytes_limit);
-  EXPECT_EQ(1234, dirs[2].bytes_limit);
-  EXPECT_EQ(99999999, dirs[3].bytes_limit);
-  EXPECT_EQ(200 * TERABYTE, dirs[4].bytes_limit);
-  EXPECT_EQ(100 * MEGABYTE, dirs[5].bytes_limit);
+  EXPECT_EQ(5 * GIGABYTE, dirs[0]->bytes_limit());
+  EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1]->bytes_limit());
+  EXPECT_EQ(1234, dirs[2]->bytes_limit());
+  EXPECT_EQ(99999999, dirs[3]->bytes_limit());
+  EXPECT_EQ(200 * TERABYTE, dirs[4]->bytes_limit());
+  EXPECT_EQ(100 * MEGABYTE, dirs[5]->bytes_limit());
 
   // Various invalid limit formats result in the directory getting skipped.
   // Include a valid dir on the end to ensure that we don't short-circuit all
@@ -984,8 +984,8 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) {
                        "/tmp/tmp-file-mgr-test5:5pb,/tmp/tmp-file-mgr-test6:10%,"
                        "/tmp/tmp-file-mgr-test1:100"));
   EXPECT_EQ(1, dirs2.size());
-  EXPECT_EQ("/tmp/tmp-file-mgr-test1/impala-scratch", dirs2[0].path);
-  EXPECT_EQ(100, dirs2[0].bytes_limit);
+  EXPECT_EQ("/tmp/tmp-file-mgr-test1/impala-scratch", dirs2[0]->path());
+  EXPECT_EQ(100, dirs2[0]->bytes_limit());
 
   // Various valid ways of specifying "unlimited".
   auto& dirs3 =
@@ -993,7 +993,7 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) {
                                   "/tmp/tmp-file-mgr-test3,/tmp/tmp-file-mgr-test4:0"));
   EXPECT_EQ(4, dirs3.size());
   for (const auto& dir : dirs3) {
-    EXPECT_EQ(numeric_limits<int64_t>::max(), dir.bytes_limit);
+    EXPECT_EQ(numeric_limits<int64_t>::max(), dir->bytes_limit());
   }
 
   // Extra colons
@@ -1015,36 +1015,36 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsingRemotePath) {
       "/tmp/local-buffer-dir2", "/tmp/local-buffer-dir3"});
 
   // Successful cases for HDFS paths.
-  auto dirs1 = GetTmpRemoteDir(
+  auto& dirs1 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost:20500/tmp,/tmp/local-buffer-dir"));
   EXPECT_NE(nullptr, dirs1);
 
-  auto dirs2 = GetTmpRemoteDir(
+  auto& dirs2 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost:20500/tmp:100,/tmp/local-buffer-dir"));
   EXPECT_NE(nullptr, dirs2);
-  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs2->path);
-  EXPECT_EQ(100, dirs2->bytes_limit);
+  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs2->path());
+  EXPECT_EQ(100, dirs2->bytes_limit());
 
-  auto dirs3 = GetTmpRemoteDir(
+  auto& dirs3 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost:20500/tmp:1KB:1,/tmp/local-buffer-dir"));
   EXPECT_NE(nullptr, dirs3);
-  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs3->path);
-  EXPECT_EQ(1024, dirs3->bytes_limit);
+  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs3->path());
+  EXPECT_EQ(1024, dirs3->bytes_limit());
 
   // Multiple local paths with one remote path.
   auto tmp_mgr_4 = CreateTmpFileMgr("hdfs://localhost:20500/tmp,/tmp/local-buffer-dir1,"
                                     "/tmp/local-buffer-dir2,/tmp/local-buffer-dir3");
-  auto dirs4_local = GetTmpDirs(tmp_mgr_4);
-  auto dirs4_remote = GetTmpRemoteDir(tmp_mgr_4);
+  auto& dirs4_local = GetTmpDirs(tmp_mgr_4);
+  auto& dirs4_remote = GetTmpRemoteDir(tmp_mgr_4);
   EXPECT_NE(nullptr, dirs4_remote);
   EXPECT_EQ(2, dirs4_local.size());
-  EXPECT_EQ("/tmp/local-buffer-dir2/impala-scratch", dirs4_local[0].path);
-  EXPECT_EQ("/tmp/local-buffer-dir3/impala-scratch", dirs4_local[1].path);
+  EXPECT_EQ("/tmp/local-buffer-dir2/impala-scratch", dirs4_local[0]->path());
+  EXPECT_EQ("/tmp/local-buffer-dir3/impala-scratch", dirs4_local[1]->path());
 
   // Fails the parsing due to no port number for the HDFS path.
   auto tmp_mgr_5 = CreateTmpFileMgr("hdfs://localhost/tmp,/tmp/local-buffer-dir");
-  auto dirs5_local = GetTmpDirs(tmp_mgr_5);
-  auto dirs5_remote = GetTmpRemoteDir(tmp_mgr_5);
+  auto& dirs5_local = GetTmpDirs(tmp_mgr_5);
+  auto& dirs5_remote = GetTmpRemoteDir(tmp_mgr_5);
   EXPECT_EQ(1, dirs5_local.size());
   EXPECT_EQ(nullptr, dirs5_remote);
 
@@ -1054,60 +1054,76 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsingRemotePath) {
 
   // Parse successfully, but the parsed HDFS path is unable to connect.
   // These cases would fail the initialization of TmpFileMgr.
-  auto dirs7 = GetTmpRemoteDir(
+  auto& dirs7 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost:1/tmp::1,/tmp/local-buffer-dir", false));
   EXPECT_EQ(nullptr, dirs7);
 
-  auto dirs8 = GetTmpRemoteDir(
+  auto& dirs8 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost:/tmp::,/tmp/local-buffer-dir", false));
   EXPECT_EQ(nullptr, dirs8);
 
-  auto dirs9 = GetTmpRemoteDir(
+  auto& dirs9 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost/tmp::1,/tmp/local-buffer-dir", false));
   EXPECT_EQ(nullptr, dirs9);
 
-  auto dirs10 = GetTmpRemoteDir(
+  auto& dirs10 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost/tmp:1,/tmp/local-buffer-dir", false));
   EXPECT_EQ(nullptr, dirs10);
 
   // Multiple remote paths, should support only one.
-  auto dirs11 = GetTmpRemoteDir(
+  auto& dirs11 = GetTmpRemoteDir(
       CreateTmpFileMgr("hdfs://localhost:20500/tmp,hdfs://localhost:20501/tmp,"
                        "/tmp/local-buffer-dir"));
   EXPECT_NE(nullptr, dirs11);
-  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs11->path);
+  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs11->path());
 
   // The order of the buffer and the remote dir should not affect the result.
-  auto dirs12 = GetTmpRemoteDir(
+  auto& dirs12 = GetTmpRemoteDir(
       CreateTmpFileMgr("/tmp/local-buffer-dir, hdfs://localhost:20500/tmp,"
                        "hdfs://localhost:20501/tmp"));
   EXPECT_NE(nullptr, dirs12);
-  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs12->path);
+  EXPECT_EQ("hdfs://localhost:20500/tmp/impala-scratch", dirs12->path());
 
   // Successful cases for parsing S3 paths.
   // Create a fake s3 connection in order to pass the connection verification.
   HdfsFsCache::HdfsFsMap fake_hdfs_conn_map;
   hdfsFS fake_conn = reinterpret_cast<hdfsFS>(1);
   fake_hdfs_conn_map.insert(make_pair("s3a://fake_host/", fake_conn));
-  auto dirs13 = GetTmpRemoteDir(
+  auto& dirs13 = GetTmpRemoteDir(
       CreateTmpFileMgr("/tmp/local-buffer-dir, s3a://fake_host/for-parsing-test-only",
           true, &fake_hdfs_conn_map));
   EXPECT_NE(nullptr, dirs13);
-  EXPECT_EQ("s3a://fake_host/for-parsing-test-only/impala-scratch", dirs13->path);
+  EXPECT_EQ("s3a://fake_host/for-parsing-test-only/impala-scratch", dirs13->path());
 
-  auto dirs14 = GetTmpRemoteDir(
+  auto& dirs14 = GetTmpRemoteDir(
       CreateTmpFileMgr("/tmp/local-buffer-dir, s3a://fake_host/for-parsing-test-only:100",
           true, &fake_hdfs_conn_map));
   EXPECT_NE(nullptr, dirs14);
-  EXPECT_EQ("s3a://fake_host/for-parsing-test-only/impala-scratch", dirs14->path);
-  EXPECT_EQ(100, dirs14->bytes_limit);
+  EXPECT_EQ("s3a://fake_host/for-parsing-test-only/impala-scratch", dirs14->path());
+  EXPECT_EQ(100, dirs14->bytes_limit());
 
-  auto dirs15 = GetTmpRemoteDir(CreateTmpFileMgr(
+  auto& dirs15 = GetTmpRemoteDir(CreateTmpFileMgr(
       "/tmp/local-buffer-dir, s3a://fake_host/for-parsing-test-only:1KB:1", true,
       &fake_hdfs_conn_map));
   EXPECT_NE(nullptr, dirs15);
-  EXPECT_EQ("s3a://fake_host/for-parsing-test-only/impala-scratch", dirs15->path);
-  EXPECT_EQ(1024, dirs15->bytes_limit);
+  EXPECT_EQ("s3a://fake_host/for-parsing-test-only/impala-scratch", dirs15->path());
+  EXPECT_EQ(1024, dirs15->bytes_limit());
+
+  // Failure cases for parsing S3 paths.
+  auto& dirs16 = GetTmpRemoteDir(CreateTmpFileMgr(
+      "/tmp/local-buffer-dir, s3a://fake_host:1234/for-parsing-test-only:1KB:1", true,
+      &fake_hdfs_conn_map));
+  EXPECT_EQ(nullptr, dirs16);
+
+  auto& dirs17 = GetTmpRemoteDir(CreateTmpFileMgr(
+      "/tmp/local-buffer-dir, s3a://fake_host:1234/for-parsing-test-only:1KB", true,
+      &fake_hdfs_conn_map));
+  EXPECT_EQ(nullptr, dirs17);
+
+  auto& dirs18 = GetTmpRemoteDir(CreateTmpFileMgr(
+      "/tmp/local-buffer-dir, s3a://fake_host:1234/for-parsing-test-only", true,
+      &fake_hdfs_conn_map));
+  EXPECT_EQ(nullptr, dirs18);
 }
 
 // Test compression buffer memory management for reads and writes.
@@ -1268,18 +1284,18 @@ TEST_F(TmpFileMgrTest, TestDirectoryPriorityParsing) {
                        "/tmp/tmp-file-mgr-test2::2,/tmp/tmp-file-mgr-test4:99999999:4,"
                        "/tmp/tmp-file-mgr-test5:200tb:5,/tmp/tmp-file-mgr-test1:5g:1"));
   EXPECT_EQ(6, dirs.size());
-  EXPECT_EQ(5 * GIGABYTE, dirs[0].bytes_limit);
-  EXPECT_EQ(1, dirs[0].priority);
-  EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1].bytes_limit);
-  EXPECT_EQ(2, dirs[1].priority);
-  EXPECT_EQ(1234, dirs[2].bytes_limit);
-  EXPECT_EQ(3, dirs[2].priority);
-  EXPECT_EQ(99999999, dirs[3].bytes_limit);
-  EXPECT_EQ(4, dirs[3].priority);
-  EXPECT_EQ(200 * TERABYTE, dirs[4].bytes_limit);
-  EXPECT_EQ(5, dirs[4].priority);
-  EXPECT_EQ(100 * MEGABYTE, dirs[5].bytes_limit);
-  EXPECT_EQ(numeric_limits<int>::max(), dirs[5].priority);
+  EXPECT_EQ(5 * GIGABYTE, dirs[0]->bytes_limit());
+  EXPECT_EQ(1, dirs[0]->priority());
+  EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1]->bytes_limit());
+  EXPECT_EQ(2, dirs[1]->priority());
+  EXPECT_EQ(1234, dirs[2]->bytes_limit());
+  EXPECT_EQ(3, dirs[2]->priority());
+  EXPECT_EQ(99999999, dirs[3]->bytes_limit());
+  EXPECT_EQ(4, dirs[3]->priority());
+  EXPECT_EQ(200 * TERABYTE, dirs[4]->bytes_limit());
+  EXPECT_EQ(5, dirs[4]->priority());
+  EXPECT_EQ(100 * MEGABYTE, dirs[5]->bytes_limit());
+  EXPECT_EQ(numeric_limits<int>::max(), dirs[5]->priority());
 
   // Various invalid limit formats result in the directory getting skipped.
   // Include a valid dir on the end to ensure that we don't short-circuit all
@@ -1290,9 +1306,9 @@ TEST_F(TmpFileMgrTest, TestDirectoryPriorityParsing) {
                        "/tmp/tmp-file-mgr-test5::p0,/tmp/tmp-file-mgr-test6::10%,"
                        "/tmp/tmp-file-mgr-test1:100:-1"));
   EXPECT_EQ(1, dirs2.size());
-  EXPECT_EQ("/tmp/tmp-file-mgr-test1/impala-scratch", dirs2[0].path);
-  EXPECT_EQ(100, dirs2[0].bytes_limit);
-  EXPECT_EQ(-1, dirs2[0].priority);
+  EXPECT_EQ("/tmp/tmp-file-mgr-test1/impala-scratch", dirs2[0]->path());
+  EXPECT_EQ(100, dirs2[0]->bytes_limit());
+  EXPECT_EQ(-1, dirs2[0]->priority());
 }
 
 // Tests that when TmpFileGroup is constructed, the priority based index ranges are
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index c18463f..d494284 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -154,7 +154,6 @@ static const Status& TMP_FILE_BUFFER_POOL_CONTEXT_CANCELLED =
     Status::CancelledInternal("TmpFileBufferPool");
 
 using DeviceId = TmpFileMgr::DeviceId;
-using TmpDir = TmpFileMgr::TmpDir;
 using WriteDoneCallback = TmpFileMgr::WriteDoneCallback;
 
 TmpFileMgr::TmpFileMgr() {}
@@ -187,6 +186,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
     MetricGroup* metrics) {
   DCHECK(!initialized_);
   punch_holes_ = punch_holes;
+  one_dir_per_device_ = one_dir_per_device;
   if (tmp_dir_specifiers.empty()) {
     LOG(WARNING) << "Running without spill to disk: no scratch directories provided.";
   }
@@ -249,41 +249,31 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
   // warning - we don't want to abort process startup because of misconfigured scratch,
   // since queries will generally still be runnable.
   for (const string& tmp_dir_spec : tmp_dir_specifiers) {
-    string tmp_dirs_without_prefix, prefix;
     string tmp_dir_spec_trimmed(boost::algorithm::trim_left_copy(tmp_dir_spec));
-    bool is_hdfs = false;
-    bool is_remote = false;
+    std::unique_ptr<TmpDir> tmp_dir;
 
     if (IsHdfsPath(tmp_dir_spec_trimmed.c_str(), false)) {
-      prefix = FILESYS_PREFIX_HDFS;
-      tmp_dirs_without_prefix = tmp_dir_spec_trimmed.substr(strlen(FILESYS_PREFIX_HDFS));
-      is_hdfs = true;
-      is_remote = true;
+      tmp_dir = std::make_unique<TmpDirHdfs>(tmp_dir_spec_trimmed);
     } else if (IsS3APath(tmp_dir_spec_trimmed.c_str(), false)) {
-      prefix = FILESYS_PREFIX_S3;
-      tmp_dirs_without_prefix = tmp_dir_spec_trimmed.substr(strlen(FILESYS_PREFIX_S3));
-      is_remote = true;
       // Initialize the S3 options for later getting S3 connection.
       s3a_options_ = {make_pair("fs.s3a.fast.upload", "true"),
           make_pair("fs.s3a.fast.upload.buffer", "disk")};
-    } else {
+      tmp_dir = std::make_unique<TmpDirS3>(tmp_dir_spec_trimmed);
+    } else if (IsGcsPath(tmp_dir_spec_trimmed.c_str(), false)) {
       // TODO(IMPALA-10561): Add support for spilling to GCS
-      prefix = "";
-      tmp_dirs_without_prefix = tmp_dir_spec_trimmed.substr(0);
+    } else {
+      tmp_dir = std::make_unique<TmpDirLocal>(tmp_dir_spec_trimmed);
     }
 
-    string parsed_path;
-    int64_t bytes_limit = numeric_limits<int64_t>::max();
-    int priority = numeric_limits<int>::max();
-    Status parse_status = ParseScratchPathToks(tmp_dir_spec, tmp_dirs_without_prefix,
-        is_hdfs, &parsed_path, &bytes_limit, &priority);
+    DCHECK(tmp_dir != nullptr);
+    Status parse_status = tmp_dir->Parse();
     if (!parse_status.ok()) {
       LOG(WARNING) << "Directory " << tmp_dir_spec.c_str() << " is not used because "
                    << parse_status.msg().msg();
       continue;
     }
 
-    if (is_remote) {
+    if (!tmp_dir->is_local()) {
       // Set the flag to reserve a local dir for buffer.
       // If the flag has been set, meaning that there is already one remote dir
       // registered, since we only support one remote dir, this remote dir will be
@@ -296,78 +286,38 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
         need_local_buffer_dir = true;
       }
     }
-
-    tmp_dirs.emplace_back(
-        new TmpDir(prefix.append(parsed_path), bytes_limit, priority, nullptr));
+    tmp_dirs.emplace_back(move(tmp_dir));
   }
 
   vector<bool> is_tmp_dir_on_disk(DiskInfo::num_disks(), false);
   // For each tmp directory, find the disk it is on,
   // so additional tmp directories on the same disk can be skipped.
   for (int i = 0; i < tmp_dirs.size(); ++i) {
-    Status status;
-    path tmp_path(trim_right_copy_if(tmp_dirs[i]->path, is_any_of("/")));
-    bool is_hdfs_path = IsHdfsPath(tmp_path.c_str(), false);
-    bool is_s3a_path = IsS3APath(tmp_path.c_str(), false);
-    if (is_hdfs_path || is_s3a_path) {
-      // Should be only one remote dir.
-      DCHECK(!HasRemoteDir());
-      path scratch_subdir_path(tmp_path / TMP_SUB_DIR_NAME);
-      string scratch_subdir_path_str = scratch_subdir_path.string();
-      hdfsFS hdfs_conn;
-      // If the HDFS path doesn't exist, it would fail while uploading, so we
-      // create the HDFS path if it doesn't exist.
-      // For the S3 path, it doesn't need to create the directory for the uploading
-      // as long as the S3 address is correct.
-      if (is_hdfs_path) {
-        RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
-            tmp_path.string(), &hdfs_conn, &hdfs_conns_));
-        if (hdfsExists(hdfs_conn, scratch_subdir_path_str.c_str()) != 0) {
-          if (hdfsCreateDirectory(hdfs_conn, scratch_subdir_path_str.c_str()) != 0) {
-            return Status(
-                GetHdfsErrorMsg("HDFS create path failed: ", scratch_subdir_path_str));
-          }
-        }
+    Status status = tmp_dirs[i]->VerifyAndCreate(
+        metrics, &is_tmp_dir_on_disk, need_local_buffer_dir, this);
+    if (!status.ok()) {
+      // If the remote directory fails to verify or create, return the error.
+      if (!tmp_dirs[i]->is_local()) return status;
+      // If it is the local directory, continue to try next directory.
+      continue;
+    }
+    if (tmp_dirs[i]->is_local()) {
+      if (need_local_buffer_dir) {
+        local_buff_dir_ = move(tmp_dirs[i]);
+        need_local_buffer_dir = false;
       } else {
-        DCHECK(is_s3a_path);
-        RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
-            tmp_path.string(), &hdfs_conn, &hdfs_conns_, s3a_options()));
+        tmp_dirs_.emplace_back(move(tmp_dirs[i]));
       }
-      tmp_dirs_remote_ =
-          std::make_unique<TmpDir>(scratch_subdir_path_str, tmp_dirs[i]->bytes_limit,
-              tmp_dirs[i]->priority, tmp_dirs[i]->bytes_used_metric, false);
     } else {
-      tmp_path = absolute(tmp_path);
-      path scratch_subdir_path(tmp_path / TMP_SUB_DIR_NAME);
-      // tmp_path must be a writable directory.
-      status = FileSystemUtil::VerifyIsDirectory(tmp_path.string());
-      if (!status.ok()) {
-        LOG(WARNING) << "Cannot use directory " << tmp_path.string()
-                     << " for scratch: " << status.msg().msg();
-        continue;
-      }
-
-      // Find the disk id of tmp_path. Add the scratch directory if there isn't another
-      // directory on the same disk (or if we don't know which disk it is on).
-      int disk_id = DiskInfo::disk_id(tmp_path.c_str());
-      if (!one_dir_per_device || disk_id < 0 || !is_tmp_dir_on_disk[disk_id]) {
-        uint64_t available_space;
-        RETURN_IF_ERROR(
-            FileSystemUtil::GetSpaceAvailable(tmp_path.string(), &available_space));
-        if (available_space < AVAILABLE_SPACE_THRESHOLD_MB * 1024 * 1024) {
-          LOG(WARNING) << "Filesystem containing scratch directory " << tmp_path
-                       << " has less than " << AVAILABLE_SPACE_THRESHOLD_MB
-                       << "MB available.";
-        }
-        RETURN_IF_ERROR(CreateDirectory(scratch_subdir_path.string(), tmp_path.string(),
-            tmp_dirs[i], metrics, &is_tmp_dir_on_disk, &need_local_buffer_dir, disk_id));
-      }
+      tmp_dirs_remote_ = move(tmp_dirs[i]);
     }
   }
 
   // Sort the tmp directories by priority.
   std::sort(tmp_dirs_.begin(), tmp_dirs_.end(),
-      [](const TmpDir& a, const TmpDir& b) { return a.priority < b.priority; });
+      [](const std::unique_ptr<TmpDir>& a, const std::unique_ptr<TmpDir>& b) {
+        return a->priority_ < b->priority_;
+      });
 
   if (HasRemoteDir()) {
     if (local_buff_dir_ == nullptr) {
@@ -375,13 +325,13 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
       // s3 fast upload directly without a buffer.
       return Status(
           Substitute("No local directory configured for remote scratch space:  $0",
-              tmp_dirs_remote_->path));
+              tmp_dirs_remote_->path_));
     } else {
-      LOG(INFO) << "Using scratch directory " << tmp_dirs_remote_->path
-                << " limit: " << PrettyPrinter::PrintBytes(tmp_dirs_remote_->bytes_limit);
+      LOG(INFO) << "Using scratch directory " << tmp_dirs_remote_->path_ << " limit: "
+                << PrettyPrinter::PrintBytes(tmp_dirs_remote_->bytes_limit_);
       IntGauge* bytes_used_metric = metrics->AddGauge(
           SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_dirs_.size()));
-      tmp_dirs_remote_->bytes_used_metric = bytes_used_metric;
+      tmp_dirs_remote_->bytes_used_metric_ = bytes_used_metric;
     }
   }
 
@@ -396,10 +346,10 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
     num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size());
   }
   for (int i = 0; i < tmp_dirs_.size(); ++i) {
-    active_scratch_dirs_metric_->Add(tmp_dirs_[i].path);
+    active_scratch_dirs_metric_->Add(tmp_dirs_[i]->path_);
   }
   if (HasRemoteDir()) {
-    active_scratch_dirs_metric_->Add(tmp_dirs_remote_->path);
+    active_scratch_dirs_metric_->Add(tmp_dirs_remote_->path_);
     RETURN_IF_ERROR(CreateTmpFileBufferPoolThread(metrics));
   }
 
@@ -417,116 +367,6 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
   return Status::OK();
 }
 
-Status TmpFileMgr::ParseScratchPathToks(const string& tmp_dir_spec,
-    const string& tmp_dirs_without_prefix, bool is_hdfs, string* path,
-    int64_t* bytes_limit, int* priority) {
-  vector<string> toks;
-  split(toks, tmp_dirs_without_prefix, is_any_of(":"), token_compress_off);
-  // The scratch path may have two options "bytes limit" and "priority".
-  // toks_option_st_idx indicates, after the split by colon, from which index on,
-  // the content should be the options.
-  int toks_option_st_idx = 1;
-  // The max size after the split by colon.
-  int max_num_tokens = 3;
-  if (is_hdfs) {
-    // We force the HDFS path to contain the port number, so the
-    // first ":" should be a part of the hdfs path.
-    if (toks.size() < 2) {
-      return Status(
-          Substitute("Hdfs path should have the port number: '$0'", tmp_dir_spec));
-    }
-    *path = toks[0].append(":").append(toks[1]);
-    toks_option_st_idx++;
-    max_num_tokens++;
-  } else {
-    *path = toks[0];
-  }
-
-  if (toks.size() > max_num_tokens) {
-    return Status(Substitute(
-        "Could not parse temporary dir specifier, too many colons: '$0'", tmp_dir_spec));
-  }
-
-  for (int i = toks_option_st_idx; i < toks.size(); i++) {
-    if (i == toks_option_st_idx) {
-      // Parse option byte_limit.
-      bool is_percent;
-      int64_t tmp_bytes_limit = ParseUtil::ParseMemSpec(toks[i], &is_percent, 0);
-      if (tmp_bytes_limit < 0 || is_percent) {
-        return Status(Substitute(
-            "Malformed scratch directory capacity configuration '$0'", tmp_dir_spec));
-      } else if (tmp_bytes_limit == 0) {
-        // Interpret -1, 0 or empty string as no limit.
-        tmp_bytes_limit = numeric_limits<int64_t>::max();
-      }
-      *bytes_limit = tmp_bytes_limit;
-    } else if (i == (toks_option_st_idx + 1)) {
-      // Parse option priority.
-      if (toks[i].empty()) continue;
-      StringParser::ParseResult result;
-      int tmp_priority =
-          StringParser::StringToInt<int>(toks[i].data(), toks[i].size(), &result);
-      if (result != StringParser::PARSE_SUCCESS) {
-        return Status(Substitute(
-            "Malformed scratch directory priority configuration '$0'", tmp_dir_spec));
-      }
-      *priority = tmp_priority;
-    } else {
-      DCHECK(false) << "Invalid temporary dir specifier: " << tmp_dir_spec;
-    }
-  }
-
-  return Status::OK();
-}
-
-Status TmpFileMgr::CreateDirectory(const string& scratch_subdir_path,
-    const string& tmp_path, const std::unique_ptr<TmpDir>& tmp_dir, MetricGroup* metrics,
-    vector<bool>* is_tmp_dir_on_disk, bool* need_local_buffer_dir, int disk_id) {
-  // Create the directory, destroying if already present. If this succeeds, we will
-  // have an empty writable scratch directory.
-  Status status = FileSystemUtil::RemoveAndCreateDirectory(scratch_subdir_path);
-  if (status.ok()) {
-    if (*need_local_buffer_dir) {
-      *need_local_buffer_dir = false;
-      // Add the first local dir as local buffer, the dir is only served as the buffer for
-      // Spill to Remote FS. At least we need the dir has two default file size space.
-      if (tmp_dir->bytes_limit < tmp_dirs_remote_ctrl_.remote_tmp_file_size_ * 2) {
-        return Status(Substitute(
-            "Local buffer directory $0 configured for remote scratch "
-            "space has a size limit of $1 bytes, should be at least twice as the "
-            "temporary file size "
-            "$2 bytes",
-            tmp_dir->path, tmp_dir->bytes_limit,
-            tmp_dirs_remote_ctrl_.remote_tmp_file_size_));
-      }
-      IntGauge* local_buff_bytes_used_metric =
-          metrics->AddGauge(LOCAL_BUFF_BYTES_USED_FORMAT, 0, Substitute("$0", 0));
-      local_buff_dir_ = std::make_unique<TmpDir>(
-          scratch_subdir_path, tmp_dir->bytes_limit, 0, local_buff_bytes_used_metric);
-      return Status::OK();
-    }
-    if (disk_id >= 0) (*is_tmp_dir_on_disk)[disk_id] = true;
-    LOG(INFO) << "Using scratch directory " << scratch_subdir_path << " on "
-              << "disk " << disk_id
-              << " limit: " << PrettyPrinter::PrintBytes(tmp_dir->bytes_limit);
-    IntGauge* bytes_used_metric = metrics->AddGauge(
-        SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_dirs_.size()));
-    tmp_dirs_.emplace_back(
-        scratch_subdir_path, tmp_dir->bytes_limit, tmp_dir->priority, bytes_used_metric);
-  } else {
-    LOG(WARNING) << "Could not remove and recreate directory " << scratch_subdir_path
-                 << ": cannot use it for scratch. "
-                 << "Error was: " << status.msg().msg();
-  }
-  if (punch_holes_) {
-    // Make sure hole punching is supported for the directory.
-    // IMPALA-9798: this file should *not* be created inside impala-scratch
-    // subdirectory to avoid races with multiple impalads starting up.
-    RETURN_IF_ERROR(FileSystemUtil::CheckHolePunch(tmp_path));
-  }
-  return Status::OK();
-}
-
 Status TmpFileMgr::CreateTmpFileBufferPoolThread(MetricGroup* metrics) {
   DCHECK(metrics != nullptr);
   tmp_dirs_remote_ctrl_.tmp_file_pool_.reset(new TmpFileBufferPool(this));
@@ -552,7 +392,7 @@ void TmpFileMgr::NewFile(
   string unique_name = lexical_cast<string>(random_generator()());
   stringstream file_name;
   file_name << PrintId(file_group->unique_id()) << "_" << unique_name;
-  path new_file_path(tmp_dirs_[device_id].path);
+  path new_file_path(tmp_dirs_[device_id]->path_);
   new_file_path /= file_name.str();
 
   new_file->reset(new TmpFileLocal(file_group, device_id, new_file_path.string()));
@@ -560,7 +400,7 @@ void TmpFileMgr::NewFile(
 
 void TmpFileMgr::RemoveRemoteDir(TmpFileGroup* file_group, DeviceId device_id) {
   if (tmp_dirs_remote_ == nullptr) return;
-  string dir = tmp_dirs_remote_->path;
+  string dir = tmp_dirs_remote_->path_;
   stringstream files_dir;
   files_dir << dir << "/" << PrintId(ExecEnv::GetInstance()->backend_id(), "_") << "_"
             << PrintId(file_group->unique_id(), "_");
@@ -636,10 +476,10 @@ Status TmpFileMgr::ReserveLocalBufferSpace(bool quick_return) {
   // the pool now.
   TmpDir* dir = local_buff_dir_.get();
   if (tmp_dirs_remote_ctrl_.local_buff_dir_bytes_high_water_mark_.Add(file_size)
-      > dir->bytes_limit) {
+      > dir->bytes_limit_) {
     tmp_dirs_remote_ctrl_.local_buff_dir_bytes_high_water_mark_.Add(-file_size);
   } else {
-    GetLocalBufferDir()->bytes_used_metric->Increment(file_size);
+    GetLocalBufferDir()->bytes_used_metric_->Increment(file_size);
     return Status::OK();
   }
 
@@ -694,10 +534,235 @@ string TmpFileMgr::GetTmpDirPath(DeviceId device_id) const {
   DCHECK_GE(device_id, 0);
   DCHECK_LT(device_id, tmp_dirs_.size() + ((tmp_dirs_remote_ == nullptr) ? 0 : 1));
   if (device_id < tmp_dirs_.size()) {
-    return tmp_dirs_[device_id].path;
+    return tmp_dirs_[device_id]->path_;
+  } else {
+    return tmp_dirs_remote_->path_;
+  }
+}
+
+TmpDir::TmpDir(const string& raw_path, const string& prefix, bool is_local)
+  : raw_path_(raw_path), prefix_(prefix), is_local_dir_(is_local), is_parsed_(false) {}
+
+Status TmpDir::GetPathFromToks(const vector<string>& toks, string* path, int* offset) {
+  DCHECK(path != nullptr);
+  DCHECK(offset != nullptr);
+  string parsed_raw_path = prefix_;
+  // The ordinary format of the directory input after split by colon is
+  // ["path", "bytes_limit", "priority"].
+  parsed_raw_path.append(toks[0]);
+  *path = parsed_raw_path;
+  *offset = 1;
+  return Status::OK();
+}
+
+Status TmpDir::ParsePath(const vector<string>& toks, int* offset) {
+  string parsed_raw_path;
+  RETURN_IF_ERROR(GetPathFromToks(toks, &parsed_raw_path, offset));
+  parsed_raw_path = trim_right_copy_if(parsed_raw_path, is_any_of("/"));
+
+  // Construct the complete scratch directory path.
+  boost::filesystem::path tmp_path(parsed_raw_path);
+  if (is_local_dir_) {
+    tmp_path = absolute(tmp_path);
+    parsed_raw_path = tmp_path.string();
+  }
+  boost::filesystem::path scratch_subdir_path(tmp_path / TMP_SUB_DIR_NAME);
+  parsed_raw_path_ = parsed_raw_path;
+  path_ = scratch_subdir_path.string();
+
+  return Status::OK();
+}
+
+Status TmpDir::ParseByteLimit(const vector<string>& toks, int index) {
+  DCHECK_GE(index, 0);
+  int64_t bytes_limit = numeric_limits<int64_t>::max();
+  if (index < toks.size()) {
+    // Parse option byte_limit.
+    bool is_percent;
+    bytes_limit = ParseUtil::ParseMemSpec(toks[index], &is_percent, 0);
+    if (bytes_limit < 0 || is_percent) {
+      return Status(Substitute(
+          "Malformed scratch directory capacity configuration '$0'", raw_path_));
+    } else if (bytes_limit == 0) {
+      // Interpret -1, 0 or empty string as no limit.
+      bytes_limit = numeric_limits<int64_t>::max();
+    }
+  }
+  bytes_limit_ = bytes_limit;
+  return Status::OK();
+}
+
+Status TmpDir::ParsePriority(const vector<string>& toks, int index) {
+  DCHECK_GE(index, 0);
+  int priority = numeric_limits<int>::max();
+  if (index < toks.size() && !toks[index].empty()) {
+    StringParser::ParseResult result;
+    priority =
+        StringParser::StringToInt<int>(toks[index].data(), toks[index].size(), &result);
+    if (result != StringParser::PARSE_SUCCESS) {
+      return Status(Substitute(
+          "Malformed scratch directory priority configuration '$0'", raw_path_));
+    }
+  }
+  priority_ = priority;
+  return Status::OK();
+}
+
+Status TmpDir::ParseTokens() {
+  vector<string> toks;
+  string path_without_prefix = raw_path_.substr(strlen(prefix_.c_str()));
+  split(toks, path_without_prefix, is_any_of(":"), token_compress_off);
+  int offset = 0;
+  RETURN_IF_ERROR(ParsePath(toks, &offset));
+  const int max_num_options = 2;
+  // The max size after the split by colon.
+  int max_num_tokens = max_num_options + offset;
+  if (toks.size() > max_num_tokens) {
+    return Status(Substitute(
+        "Could not parse temporary dir specifier, too many colons: '$0'", raw_path_));
+  }
+  // The scratch path may have two options "bytes limit" and "priority".
+  // The priority should be the first option.
+  RETURN_IF_ERROR(ParseByteLimit(toks, offset++));
+  // The priority should be the second option.
+  RETURN_IF_ERROR(ParsePriority(toks, offset));
+  return Status::OK();
+}
+
+Status TmpDir::Parse() {
+  DCHECK(!is_parsed_);
+  RETURN_IF_ERROR(ParseTokens());
+  is_parsed_ = true;
+  return Status::OK();
+}
+
+Status TmpDirLocal::VerifyAndCreate(MetricGroup* metrics,
+    vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) {
+  DCHECK(is_parsed_);
+  // The path must be a writable directory.
+  Status status = FileSystemUtil::VerifyIsDirectory(parsed_raw_path_);
+  if (!status.ok()) {
+    LOG(WARNING) << "Cannot use directory " << parsed_raw_path_
+                 << " for scratch: " << status.msg().msg();
+    return status;
+  }
+
+  // Find the disk id of path. Add the scratch directory if there isn't another directory
+  // on the same disk (or if we don't know which disk it is on).
+  int disk_id = DiskInfo::disk_id(parsed_raw_path_.c_str());
+  if (!tmp_mgr->one_dir_per_device_ || disk_id < 0 || !(*is_tmp_dir_on_disk)[disk_id]) {
+    uint64_t available_space;
+    RETURN_IF_ERROR(
+        FileSystemUtil::GetSpaceAvailable(parsed_raw_path_, &available_space));
+    if (available_space < AVAILABLE_SPACE_THRESHOLD_MB * 1024 * 1024) {
+      LOG(WARNING) << "Filesystem containing scratch directory " << parsed_raw_path_
+                   << " has less than " << AVAILABLE_SPACE_THRESHOLD_MB
+                   << "MB available.";
+    }
+    RETURN_IF_ERROR(CreateLocalDirectory(
+        metrics, is_tmp_dir_on_disk, need_local_buffer_dir, disk_id, tmp_mgr));
+    if (tmp_mgr->punch_holes_) {
+      // Make sure hole punching is supported for the directory.
+      // IMPALA-9798: this file should *not* be created inside impala-scratch
+      // subdirectory to avoid races with multiple impalads starting up.
+      RETURN_IF_ERROR(FileSystemUtil::CheckHolePunch(parsed_raw_path_));
+    }
   } else {
-    return tmp_dirs_remote_->path;
+    return Status(Substitute(
+        "The scratch directory $0 is on the same disk with another directory or on "
+        "an unknown disk.",
+        parsed_raw_path_));
   }
+  return Status::OK();
+}
+
+Status TmpDirLocal::CreateLocalDirectory(MetricGroup* metrics,
+    vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, int disk_id,
+    TmpFileMgr* tmp_mgr) {
+  // Create the directory, destroying if already present. If this succeeds, we will
+  // have an empty writable scratch directory.
+  Status status = FileSystemUtil::RemoveAndCreateDirectory(path_);
+  if (status.ok()) {
+    if (need_local_buffer_dir) {
+      // Add the first local dir as local buffer, the dir is only served as the buffer
+      // for spill to remote filesystem. At least we need the dir to have two default
+      // file size space.
+      if (bytes_limit_ < tmp_mgr->tmp_dirs_remote_ctrl_.remote_tmp_file_size_ * 2) {
+        return Status(Substitute(
+            "Local buffer directory $0 configured for remote scratch "
+            "space has a size limit of $1 bytes, should be at least twice as the "
+            "temporary file size "
+            "$2 bytes",
+            path_, bytes_limit_, tmp_mgr->tmp_dirs_remote_ctrl_.remote_tmp_file_size_));
+      }
+      bytes_used_metric_ =
+          metrics->AddGauge(LOCAL_BUFF_BYTES_USED_FORMAT, 0, Substitute("$0", 0));
+      return Status::OK();
+    }
+    if (disk_id >= 0) (*is_tmp_dir_on_disk)[disk_id] = true;
+    LOG(INFO) << "Using scratch directory " << path_ << " on "
+              << "disk " << disk_id
+              << " limit: " << PrettyPrinter::PrintBytes(bytes_limit_);
+    bytes_used_metric_ = metrics->AddGauge(
+        SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_mgr->tmp_dirs_.size()));
+  } else {
+    LOG(WARNING) << "Could not remove and recreate directory " << path_
+                 << ": cannot use it for scratch. "
+                 << "Error was: " << status.msg().msg();
+  }
+  return status;
+}
+
+Status TmpDirS3::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk,
+    bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) {
+  // For the S3 path, it doesn't need to create the directory for the uploading
+  // as long as the S3 address is correct.
+  DCHECK(is_parsed_);
+  hdfsFS hdfs_conn;
+  RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
+      parsed_raw_path_, &hdfs_conn, &(tmp_mgr->hdfs_conns_), tmp_mgr->s3a_options()));
+  return Status::OK();
+}
+
+Status TmpDirHdfs::GetPathFromToks(
+    const vector<string>& toks, string* parsed_path, int* offset) {
+  DCHECK(parsed_path != nullptr);
+  DCHECK(offset != nullptr);
+  // We enforce the HDFS scratch path to have the port number, and the format after split
+  // by colon is ["path", "port_num", "bytes_limit", "priority"], therefore, the offset
+  // to be returned is 2.
+  if (toks.size() < 2) {
+    return Status(
+        Substitute("The scrach path should have the port number: '$0'", raw_path_));
+  }
+  string parsed_raw_path = prefix_;
+  parsed_raw_path.append(toks[0]).append(":").append(toks[1]);
+  *parsed_path = parsed_raw_path;
+  *offset = 2;
+  return Status::OK();
+}
+
+Status TmpDirHdfs::Parse() {
+  DCHECK(!is_parsed_);
+  RETURN_IF_ERROR(ParseTokens());
+  is_parsed_ = true;
+  return Status::OK();
+}
+
+Status TmpDirHdfs::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk,
+    bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) {
+  DCHECK(is_parsed_);
+  hdfsFS hdfs_conn;
+  // If the HDFS path doesn't exist, it would fail while uploading, so we
+  // create the HDFS path if it doesn't exist.
+  RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
+      parsed_raw_path_, &hdfs_conn, &(tmp_mgr->hdfs_conns_)));
+  if (hdfsExists(hdfs_conn, path_.c_str()) != 0) {
+    if (hdfsCreateDirectory(hdfs_conn, path_.c_str()) != 0) {
+      return Status(GetHdfsErrorMsg("HDFS create path failed: ", path_));
+    }
+  }
+  return Status::OK();
 }
 
 TmpFile::TmpFile(
@@ -730,14 +795,14 @@ bool TmpFile::Blacklist(const ErrorMsg& msg) {
   }
 }
 
-TmpFileMgr::TmpDir* TmpFile::GetDir() {
+TmpDir* TmpFile::GetDir() {
   auto tmp_file_mgr = file_group_->tmp_file_mgr_;
   if (device_id_ >= tmp_file_mgr->tmp_dirs_.size()) {
     // Only one remote directory supported.
     DCHECK(device_id_ - tmp_file_mgr->tmp_dirs_.size() == 0);
     return tmp_file_mgr->tmp_dirs_remote_.get();
   }
-  return &tmp_file_mgr->tmp_dirs_[device_id_];
+  return tmp_file_mgr->tmp_dirs_[device_id_].get();
 }
 
 Status TmpFile::PunchHole(int64_t offset, int64_t len) {
@@ -751,7 +816,7 @@ Status TmpFile::PunchHole(int64_t offset, int64_t len) {
   KUDU_RETURN_IF_ERROR(
       file->PunchHole(offset, len), "Failed to punch hole in scratch file");
   bytes_reclaimed_.Add(len);
-  GetDir()->bytes_used_metric->Increment(-len);
+  GetDir()->bytes_used_metric()->Increment(-len);
   VLOG(3) << "Punched hole in " << path_ << " " << offset << " " << len;
   return Status::OK();
 }
@@ -774,8 +839,8 @@ bool TmpFileLocal::AllocateSpace(int64_t num_bytes, int64_t* offset) {
   DCHECK_GT(num_bytes, 0);
   TmpDir* dir = GetDir();
   // Increment optimistically and roll back if the limit is exceeded.
-  if (dir->bytes_used_metric->Increment(num_bytes) > dir->bytes_limit) {
-    dir->bytes_used_metric->Increment(-num_bytes);
+  if (dir->bytes_used_metric()->Increment(num_bytes) > dir->bytes_limit()) {
+    dir->bytes_used_metric()->Increment(-num_bytes);
     return false;
   }
   *offset = allocation_offset_;
@@ -793,7 +858,7 @@ Status TmpFileLocal::Remove() {
   int64_t bytes_in_use = file_group_->tmp_file_mgr_->punch_holes() ?
       allocation_offset_ - bytes_reclaimed_.Load() :
       allocation_offset_;
-  GetDir()->bytes_used_metric->Increment(-bytes_in_use);
+  GetDir()->bytes_used_metric()->Increment(-bytes_in_use);
   return status;
 }
 
@@ -845,7 +910,7 @@ io::DiskFile* TmpFileRemote::GetWriteFile() {
   return disk_buffer_file_.get();
 }
 
-TmpFileMgr::TmpDir* TmpFileRemote::GetLocalBufferDir() const {
+TmpDir* TmpFileRemote::GetLocalBufferDir() const {
   return file_group_->tmp_file_mgr_->GetLocalBufferDir();
 }
 
@@ -883,7 +948,7 @@ Status TmpFileRemote::Remove() {
   disk_file_->SetStatus(io::DiskFileStatus::DELETED);
 
   // Update the metrics.
-  GetDir()->bytes_used_metric->Increment(-file_size_);
+  GetDir()->bytes_used_metric()->Increment(-file_size_);
 
   // Return the file to the pool if it hasn't been enqueued.
   if (to_return_the_buffer) {
@@ -923,13 +988,13 @@ TmpFileGroup::TmpFileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
   DCHECK(tmp_file_mgr != nullptr);
   io_ctx_ = io_mgr_->RegisterContext();
   // Populate the priority based index ranges.
-  const std::vector<TmpDir>& tmp_dirs = tmp_file_mgr_->tmp_dirs_;
+  const std::vector<std::unique_ptr<TmpDir>>& tmp_dirs = tmp_file_mgr_->tmp_dirs_;
   if (tmp_dirs.size() > 0) {
     int start_index = 0;
-    int priority = tmp_dirs[0].priority;
+    int priority = tmp_dirs[0]->priority();
     for (int i = 0; i < tmp_dirs.size() - 1; ++i) {
-      priority = tmp_dirs[i].priority;
-      const int next_priority = tmp_dirs[i+1].priority;
+      priority = tmp_dirs[i]->priority();
+      const int next_priority = tmp_dirs[i + 1]->priority();
       if (next_priority != priority) {
         tmp_files_index_range_.emplace(priority, TmpFileIndexRange(start_index, i));
         start_index = i + 1;
@@ -1025,7 +1090,7 @@ void TmpFileGroup::UpdateScratchSpaceMetrics(int64_t num_bytes, bool is_remote)
   if (is_remote) current_bytes_allocated_remote_.Add(num_bytes);
 }
 
-string TmpFileGroup::GenerateNewPath(string& dir, string& unique_name) {
+string TmpFileGroup::GenerateNewPath(const string& dir, const string& unique_name) {
   stringstream file_name;
   file_name << TMP_SUB_DIR_NAME << "-" << unique_name;
   path new_file_path(dir);
@@ -1045,7 +1110,7 @@ std::shared_ptr<TmpFile>& TmpFileGroup::FindTmpFileSharedPtr(TmpFile* tmp_file)
 Status TmpFileGroup::AllocateRemoteSpace(int64_t num_bytes, TmpFile** tmp_file,
     int64_t* file_offset, vector<int>* at_capacity_dirs) {
   // Only one remote dir supported currently.
-  string dir = tmp_file_mgr_->tmp_dirs_remote_->path;
+  string dir = tmp_file_mgr_->tmp_dirs_remote_->path();
   // It is not supposed to have a remote directory other than HDFS or S3.
   DCHECK(IsHdfsPath(dir.c_str(), false) || IsS3APath(dir.c_str(), false));
 
@@ -1069,7 +1134,7 @@ Status TmpFileGroup::AllocateRemoteSpace(int64_t num_bytes, TmpFile** tmp_file,
     return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, bytes_limit_, GetBackendString());
   }
 
-  int64_t remote_dir_bytes_limit = tmp_file_mgr_->tmp_dirs_remote_->bytes_limit;
+  int64_t remote_dir_bytes_limit = tmp_file_mgr_->tmp_dirs_remote_->bytes_limit();
   if (remote_dir_bytes_limit != -1 && new_bytes > remote_dir_bytes_limit) {
     return Status(
         TErrorCode::SCRATCH_LIMIT_EXCEEDED, remote_dir_bytes_limit, GetBackendString());
@@ -1085,7 +1150,7 @@ Status TmpFileGroup::AllocateRemoteSpace(int64_t num_bytes, TmpFile** tmp_file,
       + PrintId(unique_id(), "_");
 
   string new_file_path = GenerateNewPath(dir, unique_name);
-  string local_buffer_dir = tmp_file_mgr_->local_buff_dir_->path;
+  const string& local_buffer_dir = tmp_file_mgr_->local_buff_dir_->path();
   string new_file_path_local = GenerateNewPath(local_buffer_dir, unique_name);
 
   TmpFileRemote* tmp_file_r = new TmpFileRemote(
@@ -1098,14 +1163,14 @@ Status TmpFileGroup::AllocateRemoteSpace(int64_t num_bytes, TmpFile** tmp_file,
   }
   shared_ptr<TmpFile> tmp_file_remote(move(tmp_file_r));
   int64_t file_size = tmp_file_mgr_->GetRemoteTmpFileSize();
-  TmpFileMgr::TmpDir* tmp_dir_remote = tmp_file_remote->GetDir();
-  if (tmp_dir_remote->bytes_limit != -1
-      && tmp_dir_remote->bytes_used_metric->Increment(file_size)
-          > tmp_dir_remote->bytes_limit) {
-    tmp_dir_remote->bytes_used_metric->Increment(-file_size);
+  TmpDir* tmp_dir_remote = tmp_file_remote->GetDir();
+  if (tmp_dir_remote->bytes_limit() != -1
+      && tmp_dir_remote->bytes_used_metric()->Increment(file_size)
+          > tmp_dir_remote->bytes_limit()) {
+    tmp_dir_remote->bytes_used_metric()->Increment(-file_size);
     at_capacity_dirs->push_back(dev_id);
     return Status(Substitute("Reach the size limit $0 of dir: $1",
-        tmp_dir_remote->bytes_limit, tmp_dir_remote->path));
+        tmp_dir_remote->bytes_limit(), tmp_dir_remote->path()));
   }
   UpdateScratchSpaceMetrics(file_size, true);
   tmp_files_remote_.emplace_back(move(tmp_file_remote));
@@ -1473,15 +1538,15 @@ Status TmpFileGroup::RecoverWriteError(
 Status TmpFileGroup::ScratchAllocationFailedStatus(
     const vector<int>& at_capacity_dirs) {
   vector<string> tmp_dir_paths;
-  for (TmpDir& tmp_dir : tmp_file_mgr_->tmp_dirs_) {
-    tmp_dir_paths.push_back(tmp_dir.path);
+  for (std::unique_ptr<TmpDir>& tmp_dir : tmp_file_mgr_->tmp_dirs_) {
+    tmp_dir_paths.push_back(tmp_dir->path());
   }
   vector<string> at_capacity_dir_paths;
   for (int dir_idx : at_capacity_dirs) {
     if (dir_idx >= tmp_file_mgr_->tmp_dirs_.size()) {
-      at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_remote_->path);
+      at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_remote_->path());
     } else {
-      at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_[dir_idx].path);
+      at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_[dir_idx]->path());
     }
   }
   Status status(TErrorCode::SCRATCH_ALLOCATION_FAILED, join(tmp_dir_paths, ","),
@@ -1994,7 +2059,7 @@ void TmpFileBufferPool::EnqueueTmpFilesPool(shared_ptr<TmpFile>& tmp_file, bool
     } else {
       tmp_files_avail_pool_.push_back(tmp_file);
     }
-    tmp_file_mgr_->GetLocalBufferDir()->bytes_used_metric->Increment(
+    tmp_file_mgr_->GetLocalBufferDir()->bytes_used_metric()->Increment(
         -1 * tmp_file_mgr_->GetRemoteTmpFileSize());
   }
   tmp_files_available_cv_.NotifyOne();
@@ -2030,7 +2095,7 @@ Status TmpFileBufferPool::DequeueTmpFilesPool(
     DCHECK(tmp_file_remote->is_enqueued());
     tmp_file_remote->SetEnqueued(false);
   }
-  tmp_file_mgr_->GetLocalBufferDir()->bytes_used_metric->Increment(
+  tmp_file_mgr_->GetLocalBufferDir()->bytes_used_metric()->Increment(
       tmp_file_mgr_->GetRemoteTmpFileSize());
   return Status::OK();
 }
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index bb459d9..9f3691c 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -54,6 +54,7 @@ namespace io {
 }
 struct BufferPoolClientCounters;
 class MemTracker;
+class TmpDir;
 class TmpFile;
 class TmpFileRemote;
 class TmpFileBufferPool;
@@ -132,33 +133,6 @@ class TmpFileMgr {
   /// Same typedef as io::WriteRange::WriteDoneCallback.
   typedef std::function<void(const Status&)> WriteDoneCallback;
 
-  /// A configured temporary directory that TmpFileMgr allocates files in.
-  struct TmpDir {
-    TmpDir(const std::string& path, int64_t bytes_limit, int priority,
-        IntGauge* bytes_used_metric, bool is_local_dir = true)
-      : path(path),
-        bytes_limit(bytes_limit),
-        priority(priority),
-        bytes_used_metric(bytes_used_metric),
-        is_local_dir(is_local_dir) {}
-
-    /// Path to the temporary directory.
-    std::string path;
-
-    /// Limit on bytes that should be written to this path. Set to maximum value
-    /// of int64_t if there is no limit.
-    int64_t bytes_limit;
-
-    /// Scratch directory priority.
-    int priority;
-
-    /// The current bytes of scratch used for this temporary directory.
-    IntGauge* bytes_used_metric;
-
-    /// If the dir is expected in the local file system or in the remote.
-    bool is_local_dir;
-  };
-
   /// A configuration for the control parameters of remote temporary directories.
   /// The struct is used by TmpFileMgr and has the same lifecycle as TmpFileMgr.
   struct TmpDirRemoteCtrl {
@@ -208,18 +182,6 @@ class TmpFileMgr {
   Status InitCustom(const std::vector<std::string>& tmp_dir_specifiers,
       bool one_dir_per_device, const std::string& compression_codec, bool punch_holes,
       MetricGroup* metrics) WARN_UNUSED_RESULT;
-
-  /// A helper function for InitCustom() to parse the options of the scratch directory.
-  Status ParseScratchPathToks(const string& tmp_dir_spec,
-      const string& tmp_dirs_without_prefix, bool is_hdfs, string* path,
-      int64_t* bytes_limit, int* priority) WARN_UNUSED_RESULT;
-
-  /// A helper function for InitCustom() to create a scratch directory.
-  Status CreateDirectory(const string& scratch_subdir_path, const string& tmp_path,
-      const std::unique_ptr<TmpDir>& tmp_dir, MetricGroup* metrics,
-      vector<bool>* is_tmp_dir_on_disk, bool* has_remote_dir,
-      int disk_id) WARN_UNUSED_RESULT;
-
   // Create the TmpFile buffer pool thread for async buffer file reservation.
   Status CreateTmpFileBufferPoolThread(MetricGroup* metrics) WARN_UNUSED_RESULT;
 
@@ -328,6 +290,9 @@ class TmpFileMgr {
   friend class TmpFileRemote;
   friend class TmpFileGroup;
   friend class TmpFileMgrTest;
+  friend class TmpDirLocal;
+  friend class TmpDirHdfs;
+  friend class TmpDirS3;
 
   /// Return a new TmpFile handle with a path based on file_group->unique_id. The file is
   /// associated with the 'file_group' and the file path is within the (single) scratch
@@ -353,9 +318,12 @@ class TmpFileMgr {
   /// Whether hole punching is enabled.
   bool punch_holes_ = false;
 
+  /// Whether one local scratch directory per device.
+  bool one_dir_per_device_ = false;
+
   /// The paths of the created tmp directories, which are used for spilling to local
   /// filesystem.
-  std::vector<TmpDir> tmp_dirs_;
+  std::vector<std::unique_ptr<TmpDir>> tmp_dirs_;
 
   /// The paths of remote directories, which are used for spilling to remote filesystem.
   std::unique_ptr<TmpDir> tmp_dirs_remote_;
@@ -469,7 +437,7 @@ class TmpFileGroup {
   void UpdateScratchSpaceMetrics(int64_t num_bytes, bool is_remote = false);
 
   /// Assemble and return a new path.
-  std::string GenerateNewPath(string& dir, string& unique_name);
+  std::string GenerateNewPath(const string& dir, const string& unique_name);
 
   std::string DebugString();