You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/29 21:54:20 UTC
svn commit: r1628281 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql:
exec/MoveTask.java metadata/Hive.java
Author: gunther
Date: Mon Sep 29 19:54:20 2014
New Revision: 1628281
URL: http://svn.apache.org/r1628281
Log:
HIVE-7389: Reduce number of metastore calls in MoveTask (when loading dynamic partitions) (Rajesh Balamohan via Gunther Hagleitner)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1628281&r1=1628280&r2=1628281&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Mon Sep 29 19:54:20 2014
@@ -353,6 +353,7 @@ public class MoveTask extends Task<MoveW
pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
}
+ long startTime = System.currentTimeMillis();
// load the list of DP partitions and return the list of partition specs
// TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
// to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
@@ -360,7 +361,7 @@ public class MoveTask extends Task<MoveW
// iterate over it and call loadPartition() here.
// The reason we don't do inside HIVE-1361 is the latter is large and we
// want to isolate any potential issue it may introduce.
- ArrayList<LinkedHashMap<String, String>> dp =
+ Map<Map<String, String>, Partition> dp =
db.loadDynamicPartitions(
tbd.getSourcePath(),
tbd.getTable().getTableName(),
@@ -370,16 +371,19 @@ public class MoveTask extends Task<MoveW
tbd.getHoldDDLTime(),
isSkewedStoredAsDirs(tbd),
work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID);
+ console.printInfo("\t Time taken for load dynamic partitions : " +
+ (System.currentTimeMillis() - startTime));
if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
throw new HiveException("This query creates no partitions." +
" To turn off this error, set hive.error.on.empty.partition=false.");
}
+ startTime = System.currentTimeMillis();
// for each partition spec, get the partition
// and put it to WriteEntity for post-exec hook
- for (LinkedHashMap<String, String> partSpec: dp) {
- Partition partn = db.getPartition(table, partSpec, false);
+ for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
+ Partition partn = entry.getValue();
if (bucketCols != null || sortCols != null) {
updatePartitionBucketSortColumns(table, partn, bucketCols, numBuckets, sortCols);
@@ -412,8 +416,10 @@ public class MoveTask extends Task<MoveW
table.getCols());
}
- console.printInfo("\tLoading partition " + partSpec);
+ console.printInfo("\tLoading partition " + entry.getKey());
}
+ console.printInfo("\t Time taken for adding to write entity : " +
+ (System.currentTimeMillis() - startTime));
dc = null; // reset data container to prevent it being added again.
} else { // static partitions
List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1628281&r1=1628280&r2=1628281&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Mon Sep 29 19:54:20 2014
@@ -1237,6 +1237,15 @@ public class Hive {
return getDatabase(currentDb);
}
+ public void loadPartition(Path loadPath, String tableName,
+ Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
+ boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
+ boolean isSrcLocal, boolean isAcid) throws HiveException {
+ Table tbl = getTable(tableName);
+ loadPartition(loadPath, tbl, partSpec, replace, holdDDLTime, inheritTableSpecs,
+ isSkewedStoreAsSubdir, isSrcLocal, isAcid);
+ }
+
/**
* Load a directory into a Hive Table Partition - Alters existing content of
* the partition with the contents of loadPath. - If the partition does not
@@ -1245,7 +1254,7 @@ public class Hive {
*
* @param loadPath
* Directory containing files to load into Table
- * @param tableName
+ * @param tbl
* name of table to be loaded.
* @param partSpec
* defines which partition needs to be loaded
@@ -1258,12 +1267,12 @@ public class Hive {
* @param isSrcLocal
* If the source directory is LOCAL
*/
- public void loadPartition(Path loadPath, String tableName,
+ public Partition loadPartition(Path loadPath, Table tbl,
Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
boolean isSrcLocal, boolean isAcid) throws HiveException {
- Table tbl = getTable(tableName);
Path tblDataLocationPath = tbl.getDataLocation();
+ Partition newTPart = null;
try {
/**
* Move files before creating the partition since down stream processes
@@ -1312,10 +1321,10 @@ public class Hive {
Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid);
}
+ boolean forceCreate = (!holdDDLTime) ? true : false;
+ newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs);
// recreate the partition if it existed before
if (!holdDDLTime) {
- Partition newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(),
- inheritTableSpecs);
if (isSkewedStoreAsSubdir) {
org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition();
SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo();
@@ -1325,9 +1334,9 @@ public class Hive {
/* Add list bucketing location mappings. */
skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
- alterPartition(tbl.getTableName(), new Partition(tbl, newCreatedTpart));
+ alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart));
newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs);
- newCreatedTpart = newTPart.getTPartition();
+ return new Partition(tbl, newCreatedTpart);
}
}
} catch (IOException e) {
@@ -1340,7 +1349,7 @@ public class Hive {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
}
-
+ return newTPart;
}
/**
@@ -1436,18 +1445,18 @@ private void constructOneLBLocationMap(F
* @param replace
* @param numDP number of dynamic partitions
* @param holdDDLTime
- * @return a list of strings with the dynamic partition paths
+ * @return partition map details (PartitionSpec and Partition)
* @throws HiveException
*/
- public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath,
+ public Map<Map<String, String>, Partition> loadDynamicPartitions(Path loadPath,
String tableName, Map<String, String> partSpec, boolean replace,
int numDP, boolean holdDDLTime, boolean listBucketingEnabled, boolean isAcid)
throws HiveException {
Set<Path> validPartitions = new HashSet<Path>();
try {
- ArrayList<LinkedHashMap<String, String>> fullPartSpecs =
- new ArrayList<LinkedHashMap<String, String>>();
+ Map<Map<String, String>, Partition> partitionsMap = new
+ LinkedHashMap<Map<String, String>, Partition>();
FileSystem fs = loadPath.getFileSystem(conf);
FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP+1, fs);
@@ -1481,6 +1490,7 @@ private void constructOneLBLocationMap(F
+ " to at least " + validPartitions.size() + '.');
}
+ Table tbl = getTable(tableName);
// for each dynamically created DP directory, construct a full partition spec
// and load the partition based on that
Iterator<Path> iter = validPartitions.iterator();
@@ -1493,14 +1503,12 @@ private void constructOneLBLocationMap(F
// generate a full partition specification
LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec);
Warehouse.makeSpecFromName(fullPartSpec, partPath);
- fullPartSpecs.add(fullPartSpec);
-
- // finally load the partition -- move the file to the final table address
- loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true,
- listBucketingEnabled, false, isAcid);
+ Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace,
+ holdDDLTime, true, listBucketingEnabled, false, isAcid);
+ partitionsMap.put(fullPartSpec, newPartition);
LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
}
- return fullPartSpecs;
+ return partitionsMap;
} catch (IOException e) {
throw new HiveException(e);
}