You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/26 21:12:22 UTC
[72/75] [abbrv] hive git commit: HIVE-20661: Dynamic partitions
loading calls add partition for every partition 1-by-1 (Laszlo Pinter via
Peter Vary)
HIVE-20661: Dynamic partitions loading calls add partition for every partition 1-by-1 (Laszlo Pinter via Peter Vary)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0d701546
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0d701546
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0d701546
Branch: refs/heads/master-tez092
Commit: 0d7015468611c485c01bb80cdbe4c89e4e68b680
Parents: 50a96d7
Author: Laszlo Pinter <lp...@cloudera.com>
Authored: Fri Oct 26 13:43:53 2018 +0200
Committer: Peter Vary <pv...@cloudera.com>
Committed: Fri Oct 26 13:43:53 2018 +0200
----------------------------------------------------------------------
.../metastore/SynchronizedMetaStoreClient.java | 4 +
.../apache/hadoop/hive/ql/metadata/Hive.java | 495 +++++++++++++------
.../ql/metadata/SessionHiveMetaStoreClient.java | 52 +-
.../hive/metastore/HiveMetaStoreClient.java | 2 +-
4 files changed, 398 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0d701546/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
index 0ab77e8..e8f3623 100644
--- a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
@@ -78,6 +78,10 @@ public final class SynchronizedMetaStoreClient {
return client.add_partition(partition);
}
+ public synchronized int add_partitions(List<Partition> partitions) throws TException {
+ return client.add_partitions(partitions);
+ }
+
public synchronized void alter_partition(String catName, String dbName, String tblName,
Partition newPart, EnvironmentContext environmentContext, String writeIdList) throws TException {
client.alter_partition(catName, dbName, tblName, newPart, environmentContext, writeIdList);
http://git-wip-us.apache.org/repos/asf/hive/blob/0d701546/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 4de0389..012a670 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -45,6 +45,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -96,18 +97,17 @@ import org.apache.hadoop.hive.io.HdfsUtils;
import org.apache.hadoop.hive.metastore.HiveMetaException;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
-import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.PartitionDropOptions;
-import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -1866,17 +1866,102 @@ public class Hive {
boolean isSkewedStoreAsSubdir,
boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long writeId,
int stmtId, boolean isInsertOverwrite) throws HiveException {
+
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION);
+
+ // Get the partition object if it already exists
+ Partition oldPart = getPartition(tbl, partSpec, false);
+ boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
+
+ // If config is set, table is not temporary and partition being inserted exists, capture
+ // the list of files added. For not yet existing partitions (insert overwrite to new partition
+ // or dynamic partition inserts), the add partition event will capture the list of files added.
+ List<Path> newFiles = Collections.synchronizedList(new ArrayList<>());
+
+ Partition newTPart = loadPartitionInternal(loadPath, tbl, partSpec, oldPart,
+ loadFileType, inheritTableSpecs,
+ inheritLocation, isSkewedStoreAsSubdir, isSrcLocal, isAcidIUDoperation,
+ hasFollowingStatsTask, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
+
+ AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf,
+ newTPart.getTable(), true);
+ if (tableSnapshot != null) {
+ newTPart.getTPartition().setWriteId(tableSnapshot.getWriteId());
+ }
+
+ if (oldPart == null) {
+ addPartitionToMetastore(newTPart, hasFollowingStatsTask, tbl, tableSnapshot);
+ // For acid table, add the acid_write event with file list at the time of load itself. But
+ // it should be done after partition is created.
+ if (isTxnTable && (null != newFiles)) {
+ addWriteNotificationLog(tbl, partSpec, newFiles, writeId);
+ }
+ } else {
+ try {
+ setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot);
+ } catch (TException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new HiveException(e);
+ }
+ }
+
+ perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION);
+
+ return newTPart;
+ }
+
+ /**
+ * Move all the files from loadPath into Hive. If the partition
+ * does not exist - one is created - files in loadPath are moved into Hive. But the
+ * directory itself is not removed.
+ *
+ * @param loadPath
+ * Directory containing files to load into Table
+ * @param tbl
+ * name of table to be loaded.
+ * @param partSpec
+ * defines which partition needs to be loaded
+ * @param oldPart
+ * already existing partition object, can be null
+ * @param loadFileType
+ * if REPLACE_ALL - replace files in the table,
+ * otherwise add files to table (KEEP_EXISTING, OVERWRITE_EXISTING)
+ * @param inheritTableSpecs if true, on [re]creating the partition, take the
+ * location/inputformat/outputformat/serde details from table spec
+ * @param inheritLocation
+ * if true, partition path is generated from table
+ * @param isSkewedStoreAsSubdir
+ * if true, skewed is stored as sub-directory
+ * @param isSrcLocal
+ * If the source directory is LOCAL
+ * @param isAcidIUDoperation
+ * true if this is an ACID operation Insert/Update/Delete operation
+ * @param hasFollowingStatsTask
+ * true if there is a following task which updates the stats, so, this method need not update.
+ * @param writeId
+ * write ID allocated for the current load operation
+ * @param stmtId
+ * statement ID of the current load statement
+ * @param isInsertOverwrite
+ * @param isTxnTable
+ *
+ * @return Partition object being loaded with data
+ * @throws HiveException
+ */
+ private Partition loadPartitionInternal(Path loadPath, Table tbl, Map<String, String> partSpec,
+ Partition oldPart, LoadFileType loadFileType, boolean inheritTableSpecs,
+ boolean inheritLocation, boolean isSkewedStoreAsSubdir,
+ boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask,
+ Long writeId, int stmtId, boolean isInsertOverwrite,
+ boolean isTxnTable, List<Path> newFiles) throws HiveException {
Path tblDataLocationPath = tbl.getDataLocation();
boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters());
assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
- boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
try {
PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION);
- // Get the partition object if it already exists
- Partition oldPart = getPartition(tbl, partSpec, false);
/**
* Move files before creating the partition since down stream processes
* check for existence of partition in metadata before accessing the data.
@@ -1908,17 +1993,9 @@ public class Hive {
newPartPath = oldPartPath == null
? genPartPathFromTable(tbl, partSpec, tblDataLocationPath) : oldPartPath;
}
- List<Path> newFiles = Collections.synchronizedList(new ArrayList<Path>());
perfLogger.PerfLogBegin("MoveTask", PerfLogger.FILE_MOVES);
- // If config is set, table is not temporary and partition being inserted exists, capture
- // the list of files added. For not yet existing partitions (insert overwrite to new partition
- // or dynamic partition inserts), the add partition event will capture the list of files added.
- if (areEventsForDmlNeeded(tbl, oldPart)) {
- newFiles = Collections.synchronizedList(new ArrayList<Path>());
- }
-
// Note: the stats for ACID tables do not have any coordination with either Hive ACID logic
// like txn commits, time outs, etc.; nor the lower level sync in metastore pertaining
// to ACID updates. So the are not themselves ACID.
@@ -1975,11 +2052,6 @@ public class Hive {
Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath);
alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString());
validatePartition(newTPart);
- AcidUtils.TableSnapshot tableSnapshot = null;
- tableSnapshot = AcidUtils.getTableSnapshot(conf, newTPart.getTable(), true);
- if (tableSnapshot != null) {
- newTPart.getTPartition().setWriteId(tableSnapshot.getWriteId());
- }
// If config is set, table is not temporary and partition being inserted exists, capture
// the list of files added. For not yet existing partitions (insert overwrite to new partition
@@ -2037,55 +2109,96 @@ public class Hive {
// The ACID state is probably absent. Warning is logged in the get method.
MetaStoreServerUtils.clearQuickStats(newTPart.getParameters());
}
- try {
- LOG.debug("Adding new partition " + newTPart.getSpec());
- getSynchronizedMSC().add_partition(newTPart.getTPartition());
- } catch (AlreadyExistsException aee) {
- // With multiple users concurrently issuing insert statements on the same partition has
- // a side effect that some queries may not see a partition at the time when they're issued,
- // but will realize the partition is actually there when it is trying to add such partition
- // to the metastore and thus get AlreadyExistsException, because some earlier query just created it (race condition).
- // For example, imagine such a table is created:
- // create table T (name char(50)) partitioned by (ds string);
- // and the following two queries are launched at the same time, from different sessions:
- // insert into table T partition (ds) values ('Bob', 'today'); -- creates the partition 'today'
- // insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException
- // In that case, we want to retry with alterPartition.
- LOG.debug("Caught AlreadyExistsException, trying to alter partition instead");
- setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot);
- } catch (Exception e) {
- try {
- final FileSystem newPathFileSystem = newPartPath.getFileSystem(this.getConf());
- boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
- final FileStatus status = newPathFileSystem.getFileStatus(newPartPath);
- Hive.trashFiles(newPathFileSystem, new FileStatus[] {status}, this.getConf(), isAutoPurge);
- } catch (IOException io) {
- LOG.error("Could not delete partition directory contents after failed partition creation: ", io);
- }
- throw e;
- }
-
- // For acid table, add the acid_write event with file list at the time of load itself. But
- // it should be done after partition is created.
- if (isTxnTable && (null != newFiles)) {
- addWriteNotificationLog(tbl, partSpec, newFiles, writeId);
- }
- } else {
- setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot);
}
- perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION);
return newTPart;
- } catch (IOException e) {
+ } catch (IOException | MetaException | InvalidOperationException e) {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
- } catch (MetaException e) {
- LOG.error(StringUtils.stringifyException(e));
- throw new HiveException(e);
- } catch (InvalidOperationException e) {
+ }
+ }
+
+ private void addPartitionToMetastore(Partition newTPart, boolean hasFollowingStatsTask,
+ Table tbl, TableSnapshot tableSnapshot) throws HiveException{
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding new partition " + newTPart.getSpec());
+ }
+ getSynchronizedMSC().add_partition(newTPart.getTPartition());
+ } catch (AlreadyExistsException aee) {
+ // With multiple users concurrently issuing insert statements on the same partition has
+ // a side effect that some queries may not see a partition at the time when they're issued,
+ // but will realize the partition is actually there when it is trying to add such partition
+ // to the metastore and thus get AlreadyExistsException, because some earlier query just
+ // created it (race condition).
+ // For example, imagine such a table is created:
+ // create table T (name char(50)) partitioned by (ds string);
+ // and the following two queries are launched at the same time, from different sessions:
+ // insert into table T partition (ds) values ('Bob', 'today'); -- creates the partition 'today'
+ // insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException
+ // In that case, we want to retry with alterPartition.
+ LOG.debug("Caught AlreadyExistsException, trying to alter partition instead");
+ try {
+ setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot);
+ } catch (TException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new HiveException(e);
+ }
+ } catch (Exception e) {
+ try {
+ final FileSystem newPathFileSystem = newTPart.getPartitionPath().getFileSystem(this.getConf());
+ boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
+ final FileStatus status = newPathFileSystem.getFileStatus(newTPart.getPartitionPath());
+ Hive.trashFiles(newPathFileSystem, new FileStatus[]{status}, this.getConf(), isAutoPurge);
+ } catch (IOException io) {
+ LOG.error("Could not delete partition directory contents after failed partition creation: ", io);
+ }
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
- } catch (TException e) {
+ }
+ }
+
+ private void addPartitionsToMetastore(List<Partition> partitions,
+ boolean hasFollowingStatsTask, Table tbl,
+ List<AcidUtils.TableSnapshot> tableSnapshots)
+ throws HiveException {
+ try {
+ if (LOG.isDebugEnabled()) {
+ StringBuffer debugMsg = new StringBuffer("Adding new partitions ");
+ partitions.forEach(partition -> debugMsg.append(partition.getSpec() + " "));
+ LOG.debug(debugMsg.toString());
+ }
+ getSynchronizedMSC().add_partitions(partitions.parallelStream().map(Partition::getTPartition)
+ .collect(Collectors.toList()));
+ } catch(AlreadyExistsException aee) {
+ // With multiple users concurrently issuing insert statements on the same partition has
+ // a side effect that some queries may not see a partition at the time when they're issued,
+ // but will realize the partition is actually there when it is trying to add such partition
+ // to the metastore and thus get AlreadyExistsException, because some earlier query just
+ // created it (race condition).
+ // For example, imagine such a table is created:
+ // create table T (name char(50)) partitioned by (ds string);
+ // and the following two queries are launched at the same time, from different sessions:
+ // insert into table T partition (ds) values ('Bob', 'today'); -- creates the partition 'today'
+ // insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException
+ // In that case, we want to retry with alterPartition.
+ LOG.debug("Caught AlreadyExistsException, trying to add partitions one by one.");
+ assert partitions.size() == tableSnapshots.size();
+ for (int i = 0; i < partitions.size(); i++) {
+ addPartitionToMetastore(partitions.get(i), hasFollowingStatsTask, tbl,
+ tableSnapshots.get(i));
+ }
+ } catch (Exception e) {
+ try {
+ for (Partition partition : partitions) {
+ final FileSystem newPathFileSystem = partition.getPartitionPath().getFileSystem(this.getConf());
+ boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
+ final FileStatus status = newPathFileSystem.getFileStatus(partition.getPartitionPath());
+ Hive.trashFiles(newPathFileSystem, new FileStatus[]{status}, this.getConf(), isAutoPurge);
+ }
+ } catch (IOException io) {
+ LOG.error("Could not delete partition directory contents after failed partition creation: ", io);
+ }
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
}
@@ -2375,16 +2488,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS);
- final Map<Map<String, String>, Partition> partitionsMap =
- Collections.synchronizedMap(new LinkedHashMap<Map<String, String>, Partition>());
-
- int poolSize = conf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT.varname, 1);
- final ExecutorService pool = Executors.newFixedThreadPool(poolSize,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("load-dynamic-partitions-%d")
- .build());
-
// Get all valid partition paths and existing partitions for them (if any)
final Table tbl = getTable(tableName);
final Set<Path> validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, writeId, stmtId,
@@ -2392,113 +2495,193 @@ private void constructOneLBLocationMap(FileStatus fSta,
final int partsToLoad = validPartitions.size();
final AtomicInteger partitionsLoaded = new AtomicInteger(0);
-
final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0
&& InPlaceUpdate.canRenderInPlace(conf) && !SessionState.getConsole().getIsSilent();
final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null;
+
final SessionState parentSession = SessionState.get();
+ List<Callable<Partition>> tasks = Lists.newLinkedList();
- final List<Future<Void>> futures = Lists.newLinkedList();
- // for each dynamically created DP directory, construct a full partition spec
- // and load the partition based on that
- final Map<Long, RawStore> rawStoreMap = new ConcurrentHashMap<>();
- try {
- for(final Path partPath : validPartitions) {
- // generate a full partition specification
- final LinkedHashMap<String, String> fullPartSpec = Maps.newLinkedHashMap(partSpec);
- if (!Warehouse.makeSpecFromName(
- fullPartSpec, partPath, new HashSet<String>(partSpec.keySet()))) {
- Utilities.FILE_OP_LOGGER.warn("Ignoring invalid DP directory " + partPath);
- continue;
- }
- futures.add(pool.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- try {
- // move file would require session details (needCopy() invokes SessionState.get)
- SessionState.setCurrentSessionState(parentSession);
- LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
-
- // load the partition
- Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, loadFileType,
- true, false, numLB > 0, false, isAcid, hasFollowingStatsTask, writeId, stmtId,
- isInsertOverwrite);
- partitionsMap.put(fullPartSpec, newPartition);
-
- if (inPlaceEligible) {
- synchronized (ps) {
- InPlaceUpdate.rePositionCursor(ps);
- partitionsLoaded.incrementAndGet();
- InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
- + partsToLoad + " partitions.");
- }
- }
- return null;
- } catch (Exception t) {
- LOG.error("Exception when loading partition with parameters "
- + " partPath=" + partPath + ", "
+ final class PartitionDetails {
+ Map<String, String> fullSpec;
+ Partition partition;
+ List<Path> newFiles;
+ boolean hasOldPartition = false;
+ AcidUtils.TableSnapshot tableSnapshot;
+ }
+
+ Map<Path, PartitionDetails> partitionDetailsMap =
+ Collections.synchronizedMap(new LinkedHashMap<>());
+
+ // calculate full path spec for each valid partition path
+ validPartitions.forEach(partPath -> {
+ Map<String, String> fullPartSpec = Maps.newLinkedHashMap(partSpec);
+ if (!Warehouse.makeSpecFromName(fullPartSpec, partPath, new HashSet<>(partSpec.keySet()))) {
+ Utilities.FILE_OP_LOGGER.warn("Ignoring invalid DP directory " + partPath);
+ } else {
+ PartitionDetails details = new PartitionDetails();
+ details.fullSpec = fullPartSpec;
+ partitionDetailsMap.put(partPath, details);
+ }
+ });
+
+ // fetch all the partitions matching the part spec using the partition iterable
+ // this way the maximum batch size configuration parameter is considered
+ PartitionIterable partitionIterable = new PartitionIterable(Hive.get(), tbl, partSpec,
+ conf.getInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(), 300));
+ Iterator<Partition> iterator = partitionIterable.iterator();
+
+ // Match valid partition path to partitions
+ while (iterator.hasNext()) {
+ Partition partition = iterator.next();
+ partitionDetailsMap.entrySet().parallelStream()
+ .filter(entry -> entry.getValue().fullSpec.equals(partition.getSpec()))
+ .findAny().ifPresent(entry -> {
+ entry.getValue().partition = partition;
+ entry.getValue().hasOldPartition = true;
+ });
+ }
+
+ boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
+
+ for (Entry<Path, PartitionDetails> entry : partitionDetailsMap.entrySet()) {
+ tasks.add(() -> {
+ PartitionDetails partitionDetails = entry.getValue();
+ Map<String, String> fullPartSpec = partitionDetails.fullSpec;
+ try {
+
+ SessionState.setCurrentSessionState(parentSession);
+ LOG.info("New loading path = " + entry.getKey() + " withPartSpec " + fullPartSpec);
+
+ List<Path> newFiles = Lists.newArrayList();
+ Partition oldPartition = partitionDetails.partition;
+ // load the partition
+ Partition partition = loadPartitionInternal(entry.getKey(), tbl,
+ fullPartSpec, oldPartition, loadFileType, true, false, numLB > 0, false, isAcid,
+ hasFollowingStatsTask, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
+ // if the partition already existed before the loading, no need to add it again to the
+ // metastore
+
+ AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf,
+ partition.getTable(), true);
+ if (tableSnapshot != null) {
+ partition.getTPartition().setWriteId(tableSnapshot.getWriteId());
+ }
+ partitionDetails.tableSnapshot = tableSnapshot;
+ if (oldPartition == null) {
+ partitionDetails.newFiles = newFiles;
+ partitionDetails.partition = partition;
+ }
+
+ if (inPlaceEligible) {
+ synchronized (ps) {
+ InPlaceUpdate.rePositionCursor(ps);
+ partitionsLoaded.incrementAndGet();
+ InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
+ + partsToLoad + " partitions.");
+ }
+ }
+
+ return partition;
+ } catch (Exception e) {
+ LOG.error("Exception when loading partition with parameters "
+ + " partPath=" + entry.getKey() + ", "
+ " table=" + tbl.getTableName() + ", "
+ " partSpec=" + fullPartSpec + ", "
+ " loadFileType=" + loadFileType.toString() + ", "
+ " listBucketingLevel=" + numLB + ", "
+ " isAcid=" + isAcid + ", "
- + " hasFollowingStatsTask=" + hasFollowingStatsTask, t);
- throw t;
- } finally {
- // Add embedded rawstore, so we can cleanup later to avoid memory leak
- if (getMSC().isLocalMetaStore()) {
- Long threadId = Thread.currentThread().getId();
- RawStore threadLocalRawStore = HiveMetaStore.HMSHandler.getRawStore();
- if (threadLocalRawStore == null) {
- // If the thread local rawStore is already cleaned by current thread, then remove from rawStoreMap.
- rawStoreMap.remove(threadId);
- } else {
- // If same thread is re-used, then need to cleanup the latest thread local instance of rawStore.
- // So, overwrite the old one if exists in rawStoreMap.
- rawStoreMap.put(threadId, threadLocalRawStore);
- }
- }
- }
- }
- }));
- }
- pool.shutdown();
- LOG.debug("Number of partitions to be added is " + futures.size());
+ + " hasFollowingStatsTask=" + hasFollowingStatsTask, e);
+ throw e;
+ }
+ });
+ }
- for (Future future : futures) {
- future.get();
+ int poolSize = conf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT.varname, 1);
+ ExecutorService executor = Executors.newFixedThreadPool(poolSize,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("load-dynamic-partitionsToAdd-%d").build());
+
+ List<Future<Partition>> futures = Lists.newLinkedList();
+ Map<Map<String, String>, Partition> result = Maps.newLinkedHashMap();
+ try {
+ futures = executor.invokeAll(tasks);
+ LOG.debug("Number of partitionsToAdd to be added is " + futures.size());
+ for (Future<Partition> future : futures) {
+ Partition partition = future.get();
+ result.put(partition.getSpec(), partition);
+ }
+ // add new partitions in batch
+
+ addPartitionsToMetastore(
+ partitionDetailsMap.entrySet()
+ .parallelStream()
+ .filter(entry -> !entry.getValue().hasOldPartition)
+ .map(entry -> entry.getValue().partition)
+ .collect(Collectors.toList()),
+ hasFollowingStatsTask,
+ tbl,
+ partitionDetailsMap.entrySet()
+ .parallelStream()
+ .filter(entry -> !entry.getValue().hasOldPartition)
+ .map(entry -> entry.getValue().tableSnapshot)
+ .collect(Collectors.toList()));
+ // For acid table, add the acid_write event with file list at the time of load itself. But
+ // it should be done after partition is created.
+
+ for (Entry<Path, PartitionDetails> entry : partitionDetailsMap.entrySet()) {
+ PartitionDetails partitionDetails = entry.getValue();
+ if (isTxnTable && partitionDetails.newFiles != null) {
+ addWriteNotificationLog(tbl, partitionDetails.fullSpec, partitionDetails.newFiles, writeId);
+ }
+ if (partitionDetails.hasOldPartition) {
+ setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, partitionDetails.partition,
+ partitionDetails.tableSnapshot);
+ }
}
} catch (InterruptedException | ExecutionException e) {
- LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks");
- //cancel other futures
- for (Future future : futures) {
- future.cancel(true);
- }
- throw new HiveException("Exception when loading "
- + partsToLoad + " in table " + tbl.getTableName()
- + " with loadPath=" + loadPath, e);
+ throw new HiveException("Exception when loading " + validPartitions.size()
+ + " in table " + tbl.getTableName()
+ + " with loadPath=" + loadPath);
+ } catch (TException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new HiveException(e);
+ } catch (Exception e) {
+
+ StringBuffer logMsg = new StringBuffer();
+ logMsg.append("Exception when loading partitionsToAdd with parameters ");
+ logMsg.append("partPaths=");
+ validPartitions.forEach(path -> logMsg.append(path + ", "));
+ logMsg.append("table=" + tbl.getTableName() + ", ").
+ append("partSpec=" + partSpec + ", ").
+ append("loadFileType=" + loadFileType.toString() + ", ").
+ append("listBucketingLevel=" + numLB + ", ").
+ append("isAcid=" + isAcid + ", ").
+ append("hasFollowingStatsTask=" + hasFollowingStatsTask);
+
+ LOG.error(logMsg.toString(), e);
+ throw e;
} finally {
- rawStoreMap.forEach((k, rs) -> rs.shutdown());
+ LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks");
+ executor.shutdownNow();
}
try {
if (isAcid) {
- List<String> partNames = new ArrayList<>(partitionsMap.size());
- for (Partition p : partitionsMap.values()) {
- partNames.add(p.getName());
- }
+ List<String> partNames =
+ result.values().parallelStream().map(Partition::getName).collect(Collectors.toList());
getMSC().addDynamicPartitions(parentSession.getTxnMgr().getCurrentTxnId(), writeId,
tbl.getDbName(), tbl.getTableName(), partNames,
AcidUtils.toDataOperationType(operation));
}
- LOG.info("Loaded " + partitionsMap.size() + " partitions");
+ LOG.info("Loaded " + result.size() + "partitionsToAdd");
perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS);
- return partitionsMap;
+ return result;
} catch (TException te) {
+ LOG.error(StringUtils.stringifyException(te));
throw new HiveException("Exception updating metastore for acid table "
- + tableName + " with partitions " + partitionsMap.values(), te);
+ + tableName + " with partitions " + result.values(), te);
}
}
@@ -3184,6 +3367,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
List<String> names = null;
try {
names = getMSC().listPartitionNames(dbName, tblName, max);
+ } catch (NoSuchObjectException nsoe) {
+ // this means no partition exists for the given dbName and tblName
+ // key value pairs - thrift cannot handle null return values, hence
+ // listPartitionNames() throws NoSuchObjectException to indicate null partitions
+ return Lists.newArrayList();
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
@@ -3200,6 +3388,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
try {
names = getMSC().listPartitionNames(dbName, tblName, pvals, max);
+ } catch (NoSuchObjectException nsoe) {
+ // this means no partition exists for the given partition spec
+ // key value pairs - thrift cannot handle null return values, hence
+ // listPartitionNames() throws NoSuchObjectException to indicate null partitions
+ return Lists.newArrayList();
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
http://git-wip-us.apache.org/repos/asf/hive/blob/0d701546/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
index dd23d7d..322b580 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
@@ -901,14 +901,23 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
assertPartitioned();
pTree.addPartition(p);
}
+
private Partition getPartition(String partName) throws MetaException {
assertPartitioned();
return pTree.getPartition(partName);
}
+
+ private int addPartitions(List<Partition> partitions) throws AlreadyExistsException,
+ MetaException {
+ assertPartitioned();
+ return pTree.addPartitions(partitions);
+ }
+
private List<Partition> getPartitions(List<String> partialPartVals) throws MetaException {
assertPartitioned();
return pTree.getPartitions(partialPartVals);
}
+
private void assertPartitioned() throws MetaException {
if(tTable.getPartitionKeysSize() <= 0) {
throw new MetaException(Warehouse.getQualifiedName(tTable) + " is not partitioned");
@@ -939,6 +948,17 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
private Partition getPartition(String partName) {
return parts.get(partName);
}
+
+ private int addPartitions(List<Partition> partitions)
+ throws AlreadyExistsException, MetaException {
+ int partitionsAdded = 0;
+ for (Partition partition : partitions) {
+ addPartition(partition);
+ partitionsAdded++;
+ }
+ return partitionsAdded;
+ }
+
/**
* Provided values for the 1st N partition columns, will return all matching PartitionS
* The list is a partial list of partition values in the same order as partition columns.
@@ -960,9 +980,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
}
}
/**
- * Loading Dynamic Partitons calls this.
- * Hive.loadPartition() calls this which in turn can be called from Hive.loadDynamicPartitions()
- * among others
+ * Hive.loadPartition() calls this.
* @param partition
* The partition to add
* @return the partition added
@@ -985,6 +1003,34 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
tt.addPartition(deepCopy(partition));
return partition;
}
+
+ /**
+ * Loading Dynamic Partitions calls this. The partitions which are loaded, must belong to the
+ * same table.
+ * @param partitions the new partitions to be added, must be not null
+ * @return number of partitions that were added
+ * @throws TException
+ */
+ @Override
+ public int add_partitions(List<Partition> partitions) throws TException {
+ if (partitions.isEmpty()) {
+ return 0;
+ }
+ Partition partition = partitions.get(0);
+ org.apache.hadoop.hive.metastore.api.Table table =
+ getTempTable(partition.getDbName(), partition.getTableName());
+ if (table == null) {
+ // not a temp table - Try underlying client
+ return super.add_partitions(partitions);
+ }
+ TempTable tt = getTempTable(table);
+ if (tt == null) {
+ throw new IllegalStateException("TempTable not found for" +
+ table.getTableName());
+ }
+ return tt.addPartitions(deepCopyPartitions(partitions));
+ }
+
/**
* @param partialPvals partition values, can be partial. This really means that missing values
* are represented by empty str.
http://git-wip-us.apache.org/repos/asf/hive/blob/0d701546/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index aba63f0..90b5764 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2468,7 +2468,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
return copy;
}
- private List<Partition> deepCopyPartitions(List<Partition> partitions) {
+ protected List<Partition> deepCopyPartitions(List<Partition> partitions) {
return deepCopyPartitions(partitions, null);
}