You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2018/10/26 12:39:59 UTC

hive git commit: HIVE-20661: Dynamic partitions loading calls add partition for every partition 1-by-1 (Laszlo Pinter via Peter Vary)

Repository: hive
Updated Branches:
  refs/heads/master 50a96d77e -> 0d7015468

HIVE-20661: Dynamic partitions loading calls add partition for every partition 1-by-1 (Laszlo Pinter via Peter Vary)


Branch: refs/heads/master
Commit: 0d7015468611c485c01bb80cdbe4c89e4e68b680
Parents: 50a96d7
Author: Laszlo Pinter <>
Authored: Fri Oct 26 13:43:53 2018 +0200
Committer: Peter Vary <>
Committed: Fri Oct 26 13:43:53 2018 +0200

 .../metastore/  |   4 +
 .../apache/hadoop/hive/ql/metadata/    | 495 +++++++++++++------
 .../ql/metadata/ |  52 +-
 .../hive/metastore/     |   2 +-
 4 files changed, 398 insertions(+), 155 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/metastore/ b/ql/src/java/org/apache/hadoop/hive/metastore/
index 0ab77e8..e8f3623 100644
--- a/ql/src/java/org/apache/hadoop/hive/metastore/
+++ b/ql/src/java/org/apache/hadoop/hive/metastore/
@@ -78,6 +78,10 @@ public final class SynchronizedMetaStoreClient {
     return client.add_partition(partition);
+  public synchronized int add_partitions(List<Partition> partitions) throws TException {
+    return client.add_partitions(partitions);
+  }
   public synchronized void alter_partition(String catName, String dbName, String tblName,
       Partition newPart, EnvironmentContext environmentContext, String writeIdList) throws TException {
     client.alter_partition(catName, dbName, tblName, newPart, environmentContext, writeIdList);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/
index 4de0389..012a670 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/
@@ -45,6 +45,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
@@ -96,18 +97,17 @@ import;
 import org.apache.hadoop.hive.metastore.HiveMetaException;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
-import org.apache.hadoop.hive.metastore.HiveMetaStore;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.PartitionDropOptions;
-import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -1866,17 +1866,102 @@ public class Hive {
       boolean isSkewedStoreAsSubdir,
       boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long writeId,
       int stmtId, boolean isInsertOverwrite) throws HiveException {
+    PerfLogger perfLogger = SessionState.getPerfLogger();
+    perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION);
+    // Get the partition object if it already exists
+    Partition oldPart = getPartition(tbl, partSpec, false);
+    boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
+    // If config is set, table is not temporary and partition being inserted exists, capture
+    // the list of files added. For not yet existing partitions (insert overwrite to new partition
+    // or dynamic partition inserts), the add partition event will capture the list of files added.
+    List<Path> newFiles = Collections.synchronizedList(new ArrayList<>());
+    Partition newTPart = loadPartitionInternal(loadPath, tbl, partSpec, oldPart,
+            loadFileType, inheritTableSpecs,
+            inheritLocation, isSkewedStoreAsSubdir, isSrcLocal, isAcidIUDoperation,
+            hasFollowingStatsTask, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
+    AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf,
+            newTPart.getTable(), true);
+    if (tableSnapshot != null) {
+      newTPart.getTPartition().setWriteId(tableSnapshot.getWriteId());
+    }
+    if (oldPart == null) {
+      addPartitionToMetastore(newTPart, hasFollowingStatsTask, tbl, tableSnapshot);
+      // For acid table, add the acid_write event with file list at the time of load itself. But
+      // it should be done after partition is created.
+      if (isTxnTable && (null != newFiles)) {
+        addWriteNotificationLog(tbl, partSpec, newFiles, writeId);
+      }
+    } else {
+      try {
+        setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot);
+      } catch (TException e) {
+        LOG.error(StringUtils.stringifyException(e));
+        throw new HiveException(e);
+      }
+    }
+    perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION);
+    return newTPart;
+  }
+  /**
+   * Move all the files from loadPath into Hive. If the partition
+   * does not exist - one is created - files in loadPath are moved into Hive. But the
+   * directory itself is not removed.
+   *
+   * @param loadPath
+   *          Directory containing files to load into Table
+   * @param tbl
+   *          name of table to be loaded.
+   * @param partSpec
+   *          defines which partition needs to be loaded
+   * @param oldPart
+   *          already existing partition object, can be null
+   * @param loadFileType
+   *          if REPLACE_ALL - replace files in the table,
+   *          otherwise add files to table (KEEP_EXISTING, OVERWRITE_EXISTING)
+   * @param inheritTableSpecs if true, on [re]creating the partition, take the
+   *          location/inputformat/outputformat/serde details from table spec
+   * @param inheritLocation
+   *          if true, partition path is generated from table
+   * @param isSkewedStoreAsSubdir
+   *          if true, skewed is stored as sub-directory
+   * @param isSrcLocal
+   *          If the source directory is LOCAL
+   * @param isAcidIUDoperation
+   *          true if this is an ACID operation Insert/Update/Delete operation
+   * @param hasFollowingStatsTask
+   *          true if there is a following task which updates the stats, so, this method need not update.
+   * @param writeId
+   *          write ID allocated for the current load operation
+   * @param stmtId
+   *          statement ID of the current load statement
+   * @param isInsertOverwrite
+   * @param isTxnTable
+   *
+   * @return Partition object being loaded with data
+   * @throws HiveException
+   */
+  private Partition loadPartitionInternal(Path loadPath, Table tbl, Map<String, String> partSpec,
+                        Partition oldPart, LoadFileType loadFileType, boolean inheritTableSpecs,
+                        boolean inheritLocation, boolean isSkewedStoreAsSubdir,
+                        boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask,
+                        Long writeId, int stmtId, boolean isInsertOverwrite,
+                        boolean isTxnTable, List<Path> newFiles) throws HiveException {
     Path tblDataLocationPath =  tbl.getDataLocation();
     boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters());
     assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
     boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
-    boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
     try {
       PerfLogger perfLogger = SessionState.getPerfLogger();
-      perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION);
-      // Get the partition object if it already exists
-      Partition oldPart = getPartition(tbl, partSpec, false);
        * Move files before creating the partition since down stream processes
        * check for existence of partition in metadata before accessing the data.
@@ -1908,17 +1993,9 @@ public class Hive {
         newPartPath = oldPartPath == null
           ? genPartPathFromTable(tbl, partSpec, tblDataLocationPath) : oldPartPath;
-      List<Path> newFiles = Collections.synchronizedList(new ArrayList<Path>());
       perfLogger.PerfLogBegin("MoveTask", PerfLogger.FILE_MOVES);
-      // If config is set, table is not temporary and partition being inserted exists, capture
-      // the list of files added. For not yet existing partitions (insert overwrite to new partition
-      // or dynamic partition inserts), the add partition event will capture the list of files added.
-      if (areEventsForDmlNeeded(tbl, oldPart)) {
-        newFiles = Collections.synchronizedList(new ArrayList<Path>());
-      }
       // Note: the stats for ACID tables do not have any coordination with either Hive ACID logic
       //       like txn commits, time outs, etc.; nor the lower level sync in metastore pertaining
       //       to ACID updates. So the are not themselves ACID.
@@ -1975,11 +2052,6 @@ public class Hive {
       Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath);
       alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString());
-      AcidUtils.TableSnapshot tableSnapshot = null;
-      tableSnapshot = AcidUtils.getTableSnapshot(conf, newTPart.getTable(), true);
-      if (tableSnapshot != null) {
-        newTPart.getTPartition().setWriteId(tableSnapshot.getWriteId());
-      }
       // If config is set, table is not temporary and partition being inserted exists, capture
       // the list of files added. For not yet existing partitions (insert overwrite to new partition
@@ -2037,55 +2109,96 @@ public class Hive {
           // The ACID state is probably absent. Warning is logged in the get method.
-        try {
-          LOG.debug("Adding new partition " + newTPart.getSpec());
-          getSynchronizedMSC().add_partition(newTPart.getTPartition());
-        } catch (AlreadyExistsException aee) {
-          // With multiple users concurrently issuing insert statements on the same partition has
-          // a side effect that some queries may not see a partition at the time when they're issued,
-          // but will realize the partition is actually there when it is trying to add such partition
-          // to the metastore and thus get AlreadyExistsException, because some earlier query just created it (race condition).
-          // For example, imagine such a table is created:
-          //  create table T (name char(50)) partitioned by (ds string);
-          // and the following two queries are launched at the same time, from different sessions:
-          //  insert into table T partition (ds) values ('Bob', 'today'); -- creates the partition 'today'
-          //  insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException
-          // In that case, we want to retry with alterPartition.
-          LOG.debug("Caught AlreadyExistsException, trying to alter partition instead");
-          setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot);
-        } catch (Exception e) {
-          try {
-            final FileSystem newPathFileSystem = newPartPath.getFileSystem(this.getConf());
-            boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
-            final FileStatus status = newPathFileSystem.getFileStatus(newPartPath);
-            Hive.trashFiles(newPathFileSystem, new FileStatus[] {status}, this.getConf(), isAutoPurge);
-          } catch (IOException io) {
-            LOG.error("Could not delete partition directory contents after failed partition creation: ", io);
-          }
-          throw e;
-        }
-        // For acid table, add the acid_write event with file list at the time of load itself. But
-        // it should be done after partition is created.
-        if (isTxnTable && (null != newFiles)) {
-          addWriteNotificationLog(tbl, partSpec, newFiles, writeId);
-        }
-      } else {
-        setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot);
-      perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION);
       return newTPart;
-    } catch (IOException e) {
+    } catch (IOException | MetaException | InvalidOperationException e) {
       throw new HiveException(e);
-    } catch (MetaException e) {
-      LOG.error(StringUtils.stringifyException(e));
-      throw new HiveException(e);
-    } catch (InvalidOperationException e) {
+    }
+  }
+  private void addPartitionToMetastore(Partition newTPart, boolean hasFollowingStatsTask,
+                                       Table tbl, TableSnapshot tableSnapshot) throws HiveException{
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding new partition " + newTPart.getSpec());
+      }
+      getSynchronizedMSC().add_partition(newTPart.getTPartition());
+    } catch (AlreadyExistsException aee) {
+      // With multiple users concurrently issuing insert statements on the same partition has
+      // a side effect that some queries may not see a partition at the time when they're issued,
+      // but will realize the partition is actually there when it is trying to add such partition
+      // to the metastore and thus get AlreadyExistsException, because some earlier query just
+      // created it (race condition).
+      // For example, imagine such a table is created:
+      //  create table T (name char(50)) partitioned by (ds string);
+      // and the following two queries are launched at the same time, from different sessions:
+      //  insert into table T partition (ds) values ('Bob', 'today'); -- creates the partition 'today'
+      //  insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException
+      // In that case, we want to retry with alterPartition.
+      LOG.debug("Caught AlreadyExistsException, trying to alter partition instead");
+      try {
+        setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot);
+      } catch (TException e) {
+        LOG.error(StringUtils.stringifyException(e));
+        throw new HiveException(e);
+      }
+    } catch (Exception e) {
+      try {
+        final FileSystem newPathFileSystem = newTPart.getPartitionPath().getFileSystem(this.getConf());
+        boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
+        final FileStatus status = newPathFileSystem.getFileStatus(newTPart.getPartitionPath());
+        Hive.trashFiles(newPathFileSystem, new FileStatus[]{status}, this.getConf(), isAutoPurge);
+      } catch (IOException io) {
+        LOG.error("Could not delete partition directory contents after failed partition creation: ", io);
+      }
       throw new HiveException(e);
-    } catch (TException e) {
+    }
+  }
+  private void addPartitionsToMetastore(List<Partition> partitions,
+                                        boolean hasFollowingStatsTask, Table tbl,
+                                        List<AcidUtils.TableSnapshot> tableSnapshots)
+                                        throws HiveException {
+    try {
+      if (LOG.isDebugEnabled()) {
+        StringBuffer debugMsg = new StringBuffer("Adding new partitions ");
+        partitions.forEach(partition -> debugMsg.append(partition.getSpec() + " "));
+        LOG.debug(debugMsg.toString());
+      }
+      getSynchronizedMSC().add_partitions(partitions.parallelStream().map(Partition::getTPartition)
+              .collect(Collectors.toList()));
+    } catch(AlreadyExistsException aee) {
+      // With multiple users concurrently issuing insert statements on the same partition has
+      // a side effect that some queries may not see a partition at the time when they're issued,
+      // but will realize the partition is actually there when it is trying to add such partition
+      // to the metastore and thus get AlreadyExistsException, because some earlier query just
+      // created it (race condition).
+      // For example, imagine such a table is created:
+      //  create table T (name char(50)) partitioned by (ds string);
+      // and the following two queries are launched at the same time, from different sessions:
+      //  insert into table T partition (ds) values ('Bob', 'today'); -- creates the partition 'today'
+      //  insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException
+      // In that case, we want to retry with alterPartition.
+      LOG.debug("Caught AlreadyExistsException, trying to add partitions one by one.");
+      assert partitions.size() == tableSnapshots.size();
+      for (int i = 0; i < partitions.size(); i++) {
+        addPartitionToMetastore(partitions.get(i), hasFollowingStatsTask, tbl,
+                tableSnapshots.get(i));
+      }
+    } catch (Exception e) {
+      try {
+        for (Partition partition : partitions) {
+          final FileSystem newPathFileSystem = partition.getPartitionPath().getFileSystem(this.getConf());
+          boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
+          final FileStatus status = newPathFileSystem.getFileStatus(partition.getPartitionPath());
+          Hive.trashFiles(newPathFileSystem, new FileStatus[]{status}, this.getConf(), isAutoPurge);
+        }
+      } catch (IOException io) {
+        LOG.error("Could not delete partition directory contents after failed partition creation: ", io);
+      }
       throw new HiveException(e);
@@ -2375,16 +2488,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS);
-    final Map<Map<String, String>, Partition> partitionsMap =
-        Collections.synchronizedMap(new LinkedHashMap<Map<String, String>, Partition>());
-    int poolSize = conf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT.varname, 1);
-    final ExecutorService pool = Executors.newFixedThreadPool(poolSize,
-            new ThreadFactoryBuilder()
-                .setDaemon(true)
-                .setNameFormat("load-dynamic-partitions-%d")
-                .build());
     // Get all valid partition paths and existing partitions for them (if any)
     final Table tbl = getTable(tableName);
     final Set<Path> validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, writeId, stmtId,
@@ -2392,113 +2495,193 @@ private void constructOneLBLocationMap(FileStatus fSta,
     final int partsToLoad = validPartitions.size();
     final AtomicInteger partitionsLoaded = new AtomicInteger(0);
     final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0
         && InPlaceUpdate.canRenderInPlace(conf) && !SessionState.getConsole().getIsSilent();
     final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null;
     final SessionState parentSession = SessionState.get();
+    List<Callable<Partition>> tasks = Lists.newLinkedList();
-    final List<Future<Void>> futures = Lists.newLinkedList();
-    // for each dynamically created DP directory, construct a full partition spec
-    // and load the partition based on that
-    final Map<Long, RawStore> rawStoreMap = new ConcurrentHashMap<>();
-    try {
-      for(final Path partPath : validPartitions) {
-        // generate a full partition specification
-        final LinkedHashMap<String, String> fullPartSpec = Maps.newLinkedHashMap(partSpec);
-        if (!Warehouse.makeSpecFromName(
-            fullPartSpec, partPath, new HashSet<String>(partSpec.keySet()))) {
-          Utilities.FILE_OP_LOGGER.warn("Ignoring invalid DP directory " + partPath);
-          continue;
-        }
-        futures.add(pool.submit(new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            try {
-              // move file would require session details (needCopy() invokes SessionState.get)
-              SessionState.setCurrentSessionState(parentSession);
-    "New loading path = " + partPath + " with partSpec " + fullPartSpec);
-              // load the partition
-              Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, loadFileType,
-                  true, false, numLB > 0, false, isAcid, hasFollowingStatsTask, writeId, stmtId,
-                  isInsertOverwrite);
-              partitionsMap.put(fullPartSpec, newPartition);
-              if (inPlaceEligible) {
-                synchronized (ps) {
-                  InPlaceUpdate.rePositionCursor(ps);
-                  partitionsLoaded.incrementAndGet();
-                  InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
-                      + partsToLoad + " partitions.");
-                }
-              }
-              return null;
-            } catch (Exception t) {
-              LOG.error("Exception when loading partition with parameters "
-                  + " partPath=" + partPath + ", "
+    final class PartitionDetails {
+      Map<String, String> fullSpec;
+      Partition partition;
+      List<Path> newFiles;
+      boolean hasOldPartition = false;
+      AcidUtils.TableSnapshot tableSnapshot;
+    }
+    Map<Path, PartitionDetails> partitionDetailsMap =
+            Collections.synchronizedMap(new LinkedHashMap<>());
+    // calculate full path spec for each valid partition path
+    validPartitions.forEach(partPath -> {
+      Map<String, String> fullPartSpec = Maps.newLinkedHashMap(partSpec);
+      if (!Warehouse.makeSpecFromName(fullPartSpec, partPath, new HashSet<>(partSpec.keySet()))) {
+        Utilities.FILE_OP_LOGGER.warn("Ignoring invalid DP directory " + partPath);
+      } else {
+        PartitionDetails details = new PartitionDetails();
+        details.fullSpec = fullPartSpec;
+        partitionDetailsMap.put(partPath, details);
+      }
+    });
+    // fetch all the partitions matching the part spec using the partition iterable
+    // this way the maximum batch size configuration parameter is considered
+    PartitionIterable partitionIterable = new PartitionIterable(Hive.get(), tbl, partSpec,
+              conf.getInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(), 300));
+    Iterator<Partition> iterator = partitionIterable.iterator();
+    // Match valid partition path to partitions
+    while (iterator.hasNext()) {
+      Partition partition =;
+      partitionDetailsMap.entrySet().parallelStream()
+              .filter(entry -> entry.getValue().fullSpec.equals(partition.getSpec()))
+              .findAny().ifPresent(entry -> {
+                entry.getValue().partition = partition;
+                entry.getValue().hasOldPartition = true;
+              });
+    }
+    boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
+    for (Entry<Path, PartitionDetails> entry : partitionDetailsMap.entrySet()) {
+      tasks.add(() -> {
+        PartitionDetails partitionDetails = entry.getValue();
+        Map<String, String> fullPartSpec = partitionDetails.fullSpec;
+        try {
+          SessionState.setCurrentSessionState(parentSession);
+"New loading path = " + entry.getKey() + " withPartSpec " + fullPartSpec);
+          List<Path> newFiles = Lists.newArrayList();
+          Partition oldPartition = partitionDetails.partition;
+          // load the partition
+          Partition partition = loadPartitionInternal(entry.getKey(), tbl,
+                  fullPartSpec, oldPartition, loadFileType, true, false, numLB > 0, false, isAcid,
+                  hasFollowingStatsTask, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
+          // if the partition already existed before the loading, no need to add it again to the
+          // metastore
+          AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf,
+                  partition.getTable(), true);
+          if (tableSnapshot != null) {
+            partition.getTPartition().setWriteId(tableSnapshot.getWriteId());
+          }
+          partitionDetails.tableSnapshot = tableSnapshot;
+          if (oldPartition == null) {
+            partitionDetails.newFiles = newFiles;
+            partitionDetails.partition = partition;
+          }
+          if (inPlaceEligible) {
+            synchronized (ps) {
+              InPlaceUpdate.rePositionCursor(ps);
+              partitionsLoaded.incrementAndGet();
+              InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
+                  + partsToLoad + " partitions.");
+            }
+          }
+          return partition;
+        } catch (Exception e) {
+          LOG.error("Exception when loading partition with parameters "
+                  + " partPath=" + entry.getKey() + ", "
                   + " table=" + tbl.getTableName() + ", "
                   + " partSpec=" + fullPartSpec + ", "
                   + " loadFileType=" + loadFileType.toString() + ", "
                   + " listBucketingLevel=" + numLB + ", "
                   + " isAcid=" + isAcid + ", "
-                  + " hasFollowingStatsTask=" + hasFollowingStatsTask, t);
-              throw t;
-            } finally {
-              // Add embedded rawstore, so we can cleanup later to avoid memory leak
-              if (getMSC().isLocalMetaStore()) {
-                Long threadId = Thread.currentThread().getId();
-                RawStore threadLocalRawStore = HiveMetaStore.HMSHandler.getRawStore();
-                if (threadLocalRawStore == null) {
-                  // If the thread local rawStore is already cleaned by current thread, then remove from rawStoreMap.
-                  rawStoreMap.remove(threadId);
-                } else {
-                  // If same thread is re-used, then need to cleanup the latest thread local instance of rawStore.
-                  // So, overwrite the old one if exists in rawStoreMap.
-                  rawStoreMap.put(threadId, threadLocalRawStore);
-                }
-              }
-            }
-          }
-        }));
-      }
-      pool.shutdown();
-      LOG.debug("Number of partitions to be added is " + futures.size());
+                  + " hasFollowingStatsTask=" + hasFollowingStatsTask, e);
+          throw e;
+        }
+      });
+    }
-      for (Future future : futures) {
-        future.get();
+    int poolSize = conf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT.varname, 1);
+    ExecutorService executor = Executors.newFixedThreadPool(poolSize,
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("load-dynamic-partitionsToAdd-%d").build());
+    List<Future<Partition>> futures = Lists.newLinkedList();
+    Map<Map<String, String>, Partition> result = Maps.newLinkedHashMap();
+    try {
+      futures = executor.invokeAll(tasks);
+      LOG.debug("Number of partitionsToAdd to be added is " + futures.size());
+      for (Future<Partition> future : futures) {
+        Partition partition = future.get();
+        result.put(partition.getSpec(), partition);
+      }
+      // add new partitions in batch
+      addPartitionsToMetastore(
+              partitionDetailsMap.entrySet()
+                      .parallelStream()
+                      .filter(entry -> !entry.getValue().hasOldPartition)
+                      .map(entry -> entry.getValue().partition)
+                      .collect(Collectors.toList()),
+              hasFollowingStatsTask,
+              tbl,
+              partitionDetailsMap.entrySet()
+                      .parallelStream()
+                      .filter(entry -> !entry.getValue().hasOldPartition)
+                      .map(entry -> entry.getValue().tableSnapshot)
+                      .collect(Collectors.toList()));
+      // For acid table, add the acid_write event with file list at the time of load itself. But
+      // it should be done after partition is created.
+      for (Entry<Path, PartitionDetails> entry : partitionDetailsMap.entrySet()) {
+        PartitionDetails partitionDetails = entry.getValue();
+        if (isTxnTable && partitionDetails.newFiles != null) {
+          addWriteNotificationLog(tbl, partitionDetails.fullSpec, partitionDetails.newFiles, writeId);
+        }
+        if (partitionDetails.hasOldPartition) {
+          setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, partitionDetails.partition,
+                  partitionDetails.tableSnapshot);
+        }
     } catch (InterruptedException | ExecutionException e) {
-      LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks");
-      //cancel other futures
-      for (Future future : futures) {
-        future.cancel(true);
-      }
-      throw new HiveException("Exception when loading "
-          + partsToLoad + " in table " + tbl.getTableName()
-          + " with loadPath=" + loadPath, e);
+      throw new HiveException("Exception when loading " + validPartitions.size()
+              + " in table " + tbl.getTableName()
+              + " with loadPath=" + loadPath);
+    } catch (TException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw new HiveException(e);
+    } catch (Exception e) {
+      StringBuffer logMsg = new StringBuffer();
+      logMsg.append("Exception when loading partitionsToAdd with parameters ");
+      logMsg.append("partPaths=");
+      validPartitions.forEach(path -> logMsg.append(path + ", "));
+      logMsg.append("table=" + tbl.getTableName() + ", ").
+              append("partSpec=" + partSpec + ", ").
+              append("loadFileType=" + loadFileType.toString() + ", ").
+              append("listBucketingLevel=" + numLB + ", ").
+              append("isAcid=" + isAcid + ", ").
+              append("hasFollowingStatsTask=" + hasFollowingStatsTask);
+      LOG.error(logMsg.toString(), e);
+      throw e;
     } finally {
-      rawStoreMap.forEach((k, rs) -> rs.shutdown());
+      LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks");
+      executor.shutdownNow();
     try {
       if (isAcid) {
-        List<String> partNames = new ArrayList<>(partitionsMap.size());
-        for (Partition p : partitionsMap.values()) {
-          partNames.add(p.getName());
-        }
+        List<String> partNames =
+                result.values().parallelStream().map(Partition::getName).collect(Collectors.toList());
         getMSC().addDynamicPartitions(parentSession.getTxnMgr().getCurrentTxnId(), writeId,
                 tbl.getDbName(), tbl.getTableName(), partNames,
-"Loaded " + partitionsMap.size() + " partitions");
+"Loaded " + result.size() + "partitionsToAdd");
       perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS);
-      return partitionsMap;
+      return result;
     } catch (TException te) {
+      LOG.error(StringUtils.stringifyException(te));
       throw new HiveException("Exception updating metastore for acid table "
-          + tableName + " with partitions " + partitionsMap.values(), te);
+          + tableName + " with partitions " + result.values(), te);
@@ -3184,6 +3367,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
     List<String> names = null;
     try {
       names = getMSC().listPartitionNames(dbName, tblName, max);
+    } catch (NoSuchObjectException nsoe) {
+      // this means no partition exists for the given dbName and tblName
+      // key value pairs - thrift cannot handle null return values, hence
+      // listPartitionNames() throws NoSuchObjectException to indicate null partitions
+      return Lists.newArrayList();
     } catch (Exception e) {
       throw new HiveException(e);
@@ -3200,6 +3388,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
     try {
       names = getMSC().listPartitionNames(dbName, tblName, pvals, max);
+    } catch (NoSuchObjectException nsoe) {
+      // this means no partition exists for the given partition spec
+      // key value pairs - thrift cannot handle null return values, hence
+      // listPartitionNames() throws NoSuchObjectException to indicate null partitions
+      return Lists.newArrayList();
     } catch (Exception e) {
       throw new HiveException(e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/
index dd23d7d..322b580 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/
@@ -901,14 +901,23 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
     private Partition getPartition(String partName) throws MetaException {
       return pTree.getPartition(partName);
+    private int addPartitions(List<Partition> partitions) throws AlreadyExistsException,
+            MetaException {
+      assertPartitioned();
+      return pTree.addPartitions(partitions);
+    }
     private List<Partition> getPartitions(List<String> partialPartVals) throws MetaException {
       return pTree.getPartitions(partialPartVals);
     private void assertPartitioned() throws MetaException {
       if(tTable.getPartitionKeysSize() <= 0) {
         throw new MetaException(Warehouse.getQualifiedName(tTable) + " is not partitioned");
@@ -939,6 +948,17 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
       private Partition getPartition(String partName) {
         return parts.get(partName);
+      private int addPartitions(List<Partition> partitions)
+              throws AlreadyExistsException, MetaException {
+        int partitionsAdded = 0;
+        for (Partition partition : partitions) {
+          addPartition(partition);
+          partitionsAdded++;
+        }
+        return partitionsAdded;
+      }
        * Provided values for the 1st N partition columns, will return all matching PartitionS
        * The list is a partial list of partition values in the same order as partition columns.
@@ -960,9 +980,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
-   * Loading Dynamic Partitons calls this.
-   * Hive.loadPartition() calls this which in turn can be called from Hive.loadDynamicPartitions()
-   * among others
+   * Hive.loadPartition() calls this.
    * @param partition
    *          The partition to add
    * @return the partition added
@@ -985,6 +1003,34 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
     return partition;
+  /**
+   * Loading Dynamic Partitions calls this. The partitions which are loaded, must belong to the
+   * same table.
+   * @param partitions the new partitions to be added, must be not null
+   * @return number of partitions that were added
+   * @throws TException
+   */
+  @Override
+  public int add_partitions(List<Partition> partitions) throws TException {
+    if (partitions.isEmpty()) {
+      return 0;
+    }
+    Partition partition = partitions.get(0);
+    org.apache.hadoop.hive.metastore.api.Table table =
+            getTempTable(partition.getDbName(), partition.getTableName());
+    if (table == null) {
+      // not a temp table - Try underlying client
+      return super.add_partitions(partitions);
+    }
+    TempTable tt = getTempTable(table);
+    if (tt == null) {
+      throw new IllegalStateException("TempTable not found for" +
+              table.getTableName());
+    }
+    return tt.addPartitions(deepCopyPartitions(partitions));
+  }
    * @param partialPvals partition values, can be partial.  This really means that missing values
    *                    are represented by empty str.
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/
index aba63f0..90b5764 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/
@@ -2468,7 +2468,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     return copy;
-  private List<Partition> deepCopyPartitions(List<Partition> partitions) {
+  protected List<Partition> deepCopyPartitions(List<Partition> partitions) {
     return deepCopyPartitions(partitions, null);