You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/29 21:54:20 UTC

svn commit: r1628281 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: exec/MoveTask.java metadata/Hive.java

Author: gunther
Date: Mon Sep 29 19:54:20 2014
New Revision: 1628281

URL: http://svn.apache.org/r1628281
Log:
HIVE-7389: Reduce number of metastore calls in MoveTask (when loading dynamic partitions) (Rajesh Balamohan via Gunther Hagleitner)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1628281&r1=1628280&r2=1628281&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Mon Sep 29 19:54:20 2014
@@ -353,6 +353,7 @@ public class MoveTask extends Task<MoveW
               pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
             }
 
+            long startTime = System.currentTimeMillis();
             // load the list of DP partitions and return the list of partition specs
             // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
             // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
@@ -360,7 +361,7 @@ public class MoveTask extends Task<MoveW
             // iterate over it and call loadPartition() here.
             // The reason we don't do inside HIVE-1361 is the latter is large and we
             // want to isolate any potential issue it may introduce.
-            ArrayList<LinkedHashMap<String, String>> dp =
+            Map<Map<String, String>, Partition> dp =
               db.loadDynamicPartitions(
                 tbd.getSourcePath(),
                 tbd.getTable().getTableName(),
@@ -370,16 +371,19 @@ public class MoveTask extends Task<MoveW
                 tbd.getHoldDDLTime(),
                 isSkewedStoredAsDirs(tbd),
                 work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID);
+            console.printInfo("\t Time taken for load dynamic partitions : "  +
+                (System.currentTimeMillis() - startTime));
 
             if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
               throw new HiveException("This query creates no partitions." +
                   " To turn off this error, set hive.error.on.empty.partition=false.");
             }
 
+            startTime = System.currentTimeMillis();
             // for each partition spec, get the partition
             // and put it to WriteEntity for post-exec hook
-            for (LinkedHashMap<String, String> partSpec: dp) {
-              Partition partn = db.getPartition(table, partSpec, false);
+            for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
+              Partition partn = entry.getValue();
 
               if (bucketCols != null || sortCols != null) {
                 updatePartitionBucketSortColumns(table, partn, bucketCols, numBuckets, sortCols);
@@ -412,8 +416,10 @@ public class MoveTask extends Task<MoveW
                     table.getCols());
               }
 
-              console.printInfo("\tLoading partition " + partSpec);
+              console.printInfo("\tLoading partition " + entry.getKey());
             }
+            console.printInfo("\t Time taken for adding to write entity : " +
+                (System.currentTimeMillis() - startTime));
             dc = null; // reset data container to prevent it being added again.
           } else { // static partitions
             List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1628281&r1=1628280&r2=1628281&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Mon Sep 29 19:54:20 2014
@@ -1237,6 +1237,15 @@ public class Hive {
     return getDatabase(currentDb);
   }
 
+  public void loadPartition(Path loadPath, String tableName,
+      Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
+      boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
+      boolean isSrcLocal, boolean isAcid) throws HiveException {
+    Table tbl = getTable(tableName);
+    loadPartition(loadPath, tbl, partSpec, replace, holdDDLTime, inheritTableSpecs,
+        isSkewedStoreAsSubdir, isSrcLocal, isAcid);
+  }
+
   /**
    * Load a directory into a Hive Table Partition - Alters existing content of
    * the partition with the contents of loadPath. - If the partition does not
@@ -1245,7 +1254,7 @@ public class Hive {
    *
    * @param loadPath
    *          Directory containing files to load into Table
-   * @param tableName
+   * @param  tbl
    *          name of table to be loaded.
    * @param partSpec
    *          defines which partition needs to be loaded
@@ -1258,12 +1267,12 @@ public class Hive {
    * @param isSrcLocal
    *          If the source directory is LOCAL
    */
-  public void loadPartition(Path loadPath, String tableName,
+  public Partition loadPartition(Path loadPath, Table tbl,
       Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
       boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
       boolean isSrcLocal, boolean isAcid) throws HiveException {
-    Table tbl = getTable(tableName);
     Path tblDataLocationPath =  tbl.getDataLocation();
+    Partition newTPart = null;
     try {
       /**
        * Move files before creating the partition since down stream processes
@@ -1312,10 +1321,10 @@ public class Hive {
         Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid);
       }
 
+      boolean forceCreate = (!holdDDLTime) ? true : false;
+      newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs);
       // recreate the partition if it existed before
       if (!holdDDLTime) {
-        Partition newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(),
-            inheritTableSpecs);
         if (isSkewedStoreAsSubdir) {
           org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition();
           SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo();
@@ -1325,9 +1334,9 @@ public class Hive {
           /* Add list bucketing location mappings. */
           skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
           newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
-          alterPartition(tbl.getTableName(), new Partition(tbl, newCreatedTpart));
+          alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart));
           newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs);
-          newCreatedTpart = newTPart.getTPartition();
+          return new Partition(tbl, newCreatedTpart);
         }
       }
     } catch (IOException e) {
@@ -1340,7 +1349,7 @@ public class Hive {
       LOG.error(StringUtils.stringifyException(e));
       throw new HiveException(e);
     }
-
+    return newTPart;
   }
 
   /**
@@ -1436,18 +1445,18 @@ private void constructOneLBLocationMap(F
    * @param replace
    * @param numDP number of dynamic partitions
    * @param holdDDLTime
-   * @return a list of strings with the dynamic partition paths
+   * @return partition map details (PartitionSpec and Partition)
    * @throws HiveException
    */
-  public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath,
+  public Map<Map<String, String>, Partition> loadDynamicPartitions(Path loadPath,
       String tableName, Map<String, String> partSpec, boolean replace,
       int numDP, boolean holdDDLTime, boolean listBucketingEnabled, boolean isAcid)
       throws HiveException {
 
     Set<Path> validPartitions = new HashSet<Path>();
     try {
-      ArrayList<LinkedHashMap<String, String>> fullPartSpecs =
-        new ArrayList<LinkedHashMap<String, String>>();
+      Map<Map<String, String>, Partition> partitionsMap = new
+          LinkedHashMap<Map<String, String>, Partition>();
 
       FileSystem fs = loadPath.getFileSystem(conf);
       FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP+1, fs);
@@ -1481,6 +1490,7 @@ private void constructOneLBLocationMap(F
             + " to at least " + validPartitions.size() + '.');
       }
 
+      Table tbl = getTable(tableName);
       // for each dynamically created DP directory, construct a full partition spec
       // and load the partition based on that
       Iterator<Path> iter = validPartitions.iterator();
@@ -1493,14 +1503,12 @@ private void constructOneLBLocationMap(F
         // generate a full partition specification
         LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec);
         Warehouse.makeSpecFromName(fullPartSpec, partPath);
-        fullPartSpecs.add(fullPartSpec);
-
-        // finally load the partition -- move the file to the final table address
-        loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true,
-            listBucketingEnabled, false, isAcid);
+        Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace,
+            holdDDLTime, true, listBucketingEnabled, false, isAcid);
+        partitionsMap.put(fullPartSpec, newPartition);
         LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
       }
-      return fullPartSpecs;
+      return partitionsMap;
     } catch (IOException e) {
       throw new HiveException(e);
     }