You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2011/07/21 08:46:20 UTC

svn commit: r1149047 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: exec/ExecDriver.java exec/FileSinkOperator.java exec/Utilities.java io/RCFileOutputFormat.java io/rcfile/merge/BlockMergeTask.java io/rcfile/merge/RCFileMergeMapper.java

Author: heyongqiang
Date: Thu Jul 21 06:46:17 2011
New Revision: 1149047

URL: http://svn.apache.org/viewvc?rev=1149047&view=rev
Log:
HIVE-2201:reduce name node calls in hive by creating temporary directories (Siying Dong via He Yongqiang)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1149047&r1=1149046&r2=1149047&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Jul 21 06:46:17 2011
@@ -108,6 +108,7 @@ public class ExecDriver extends Task<Map
     this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
   }
 
+  @Override
   public boolean requireLock() {
     return true;
   }
@@ -205,6 +206,44 @@ public class ExecDriver extends Task<Map
     return false;
   }
 
+  protected void createTmpDirs() throws IOException {
+    // fix up outputs
+    Map<String, ArrayList<String>> pa = work.getPathToAliases();
+    if (pa != null) {
+      ArrayList<Operator<? extends Serializable>> opList = new ArrayList<Operator<? extends Serializable>>();
+
+      if (work.getReducer() != null) {
+        opList.add(work.getReducer());
+      }
+
+      for (List<String> ls : pa.values()) {
+        for (String a : ls) {
+          opList.add(work.getAliasToWork().get(a));
+
+          while (!opList.isEmpty()) {
+            Operator<? extends Serializable> op = opList.remove(0);
+
+            if (op instanceof FileSinkOperator) {
+              FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
+              String tempDir = fdesc.getDirName();
+
+              if (tempDir != null) {
+                Path tempPath = Utilities.toTempPath(new Path(tempDir));
+                LOG.info("Making Temp Directory: " + tempDir);
+                FileSystem fs = tempPath.getFileSystem(job);
+                fs.mkdirs(tempPath);
+              }
+            }
+
+            if (op.getChildOperators() != null) {
+              opList.addAll(op.getChildOperators());
+            }
+          }
+        }
+      }
+    }
+  }
+
    /**
    * Execute a query plan using Hadoop.
    */
@@ -405,6 +444,8 @@ public class ExecDriver extends Task<Map
         }
       }
 
+      this.createTmpDirs();
+
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
       // replace it back

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1149047&r1=1149046&r2=1149047&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Jul 21 06:46:17 2011
@@ -52,10 +52,10 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -100,6 +100,7 @@ public class FileSinkOperator extends Te
 
   public class FSPaths implements Cloneable {
     Path tmpPath;
+    Path taskOutputTempPath;
     Path[] outPaths;
     Path[] finalPaths;
     RecordWriter[] outWriters;
@@ -110,6 +111,7 @@ public class FileSinkOperator extends Te
 
     public FSPaths(Path specPath) {
       tmpPath = Utilities.toTempPath(specPath);
+      taskOutputTempPath = Utilities.toTaskTempPath(specPath);
       outPaths = new Path[numFiles];
       finalPaths = new Path[numFiles];
       outWriters = new RecordWriter[numFiles];
@@ -129,6 +131,14 @@ public class FileSinkOperator extends Te
     /**
      * Update OutPath according to tmpPath.
      */
+    public Path getTaskOutPath(String taskId) {
+      return getOutPath(taskId, this.taskOutputTempPath);
+    }
+
+
+    /**
+     * Update OutPath according to tmpPath.
+     */
     public Path getOutPath(String taskId) {
       return getOutPath(taskId, this.tmpPath);
     }
@@ -182,14 +192,17 @@ public class FileSinkOperator extends Te
     private void commit(FileSystem fs) throws HiveException {
       for (int idx = 0; idx < outPaths.length; ++idx) {
         try {
+          if (bDynParts && !fs.exists(finalPaths[idx].getParent())) {
+            fs.mkdirs(finalPaths[idx].getParent());
+          }
           if (!fs.rename(outPaths[idx], finalPaths[idx])) {
-            throw new HiveException("Unable to rename output to: "
-                + finalPaths[idx]);
+            throw new HiveException("Unable to rename output from: " +
+                outPaths[idx] + " to: " + finalPaths[idx]);
           }
           updateProgress();
         } catch (IOException e) {
-          throw new HiveException(e + "Unable to rename output to: "
-              + finalPaths[idx]);
+          throw new HiveException("Unable to rename output from: " +
+              outPaths[idx] + " to: " + finalPaths[idx], e);
         }
       }
     }
@@ -425,7 +438,7 @@ public class FileSinkOperator extends Te
         if (isNativeTable) {
           fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId);
           LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]);
-          fsp.outPaths[filesIdx] = fsp.getOutPath(taskId);
+          fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId);
           LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]);
         } else {
           fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath;
@@ -616,6 +629,7 @@ public class FileSinkOperator extends Te
         }
         fsp2 = new FSPaths(specPath);
         fsp2.tmpPath = new Path(fsp2.tmpPath, dpDir);
+        fsp2.taskOutputTempPath = new Path(fsp2.taskOutputTempPath, dpDir);
         createBucketFiles(fsp2);
         valToPaths.put(dpDir, fsp2);
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1149047&r1=1149046&r2=1149047&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Jul 21 06:46:17 2011
@@ -117,8 +117,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
@@ -133,8 +133,8 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.mapred.FileOutputFormat;
@@ -1012,6 +1012,18 @@ public final class Utilities {
   }
 
   private static final String tmpPrefix = "_tmp.";
+  private static final String taskTmpPrefix = "_task_tmp.";
+
+  public static Path toTaskTempPath(Path orig) {
+    if (orig.getName().indexOf(taskTmpPrefix) == 0) {
+      return orig;
+    }
+    return new Path(orig.getParent(), taskTmpPrefix + orig.getName());
+  }
+
+  public static Path toTaskTempPath(String orig) {
+    return toTaskTempPath(new Path(orig));
+  }
 
   public static Path toTempPath(Path orig) {
     if (orig.getName().indexOf(tmpPrefix) == 0) {
@@ -1211,6 +1223,7 @@ public final class Utilities {
 
     FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
     Path tmpPath = Utilities.toTempPath(specPath);
+    Path taskTmpPath = Utilities.toTaskTempPath(specPath);
     Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
         + ".intermediate");
     Path finalPath = new Path(specPath);
@@ -1236,6 +1249,7 @@ public final class Utilities {
     } else {
       fs.delete(tmpPath, true);
     }
+    fs.delete(taskTmpPath, true);
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java?rev=1149047&r1=1149046&r2=1149047&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java Thu Jul 21 06:46:17 2011
@@ -77,9 +77,6 @@ public class RCFileOutputFormat extends
 
     Path outputPath = getWorkOutputPath(job);
     FileSystem fs = outputPath.getFileSystem(job);
-    if (!fs.exists(outputPath)) {
-      fs.mkdirs(outputPath);
-    }
     Path file = new Path(outputPath, name);
     CompressionCodec codec = null;
     if (getCompressOutput(job)) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1149047&r1=1149046&r2=1149047&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Thu Jul 21 06:46:17 2011
@@ -75,6 +75,7 @@ public class BlockMergeTask extends Task
     jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this);
   }
 
+  @Override
   public boolean requireLock() {
     return true;
   }
@@ -140,6 +141,17 @@ public class BlockMergeTask extends Task
     }
 
     String outputPath = this.work.getOutputDir();
+    Path tempOutPath = Utilities.toTempPath(new Path(outputPath));
+    try {
+      FileSystem fs = tempOutPath.getFileSystem(job);
+      if (!fs.exists(tempOutPath)) {
+        fs.mkdirs(tempOutPath);
+      }
+    } catch (IOException e) {
+      console.printError("Can't make path " + outputPath + " : " + e.getMessage());
+      return 6;
+    }
+
     RCFileBlockMergeOutputFormat.setMergeOutputPath(job, new Path(outputPath));
 
     job.setOutputKeyClass(NullWritable.class);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java?rev=1149047&r1=1149046&r2=1149047&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java Thu Jul 21 06:46:17 2011
@@ -61,6 +61,7 @@ public class RCFileMergeMapper extends M
   boolean hasDynamicPartitions = false;
   boolean tmpPathFixed = false;
   Path tmpPath;
+  Path taskTmpPath;
   Path dpPath;
 
   public final static Log LOG = LogFactory.getLog("RCFileMergeMapper");
@@ -68,6 +69,7 @@ public class RCFileMergeMapper extends M
   public RCFileMergeMapper() {
   }
 
+  @Override
   public void configure(JobConf job) {
     jc = job;
     hasDynamicPartitions = HiveConf.getBoolVar(job,
@@ -75,7 +77,9 @@ public class RCFileMergeMapper extends M
 
     String specPath = RCFileBlockMergeOutputFormat.getMergeOutputPath(job)
         .toString();
-    updatePaths(Utilities.toTempPath(specPath));
+    Path tmpPath = Utilities.toTempPath(specPath);
+    Path taskTmpPath = Utilities.toTaskTempPath(specPath);
+    updatePaths(tmpPath, taskTmpPath);
     try {
       fs = (new Path(specPath)).getFileSystem(job);
       autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs,
@@ -86,11 +90,12 @@ public class RCFileMergeMapper extends M
     }
   }
 
-  private void updatePaths(Path tmpPath) {
+  private void updatePaths(Path tmpPath, Path taskTmpPath) {
     String taskId = Utilities.getTaskId(jc);
     this.tmpPath = tmpPath;
+    this.taskTmpPath = taskTmpPath;
     finalPath = new Path(tmpPath, taskId);
-    outPath = new Path(tmpPath, Utilities.toTempPath(taskId));
+    outPath = new Path(taskTmpPath, Utilities.toTempPath(taskId));
   }
 
   @Override
@@ -163,9 +168,10 @@ public class RCFileMergeMapper extends M
    *
    * @param inputPath
    * @throws HiveException
+   * @throws IOException
    */
   private void fixTmpPath(Path inputPath)
-      throws HiveException {
+      throws HiveException, IOException {
     dpPath = inputPath;
     Path newPath = new Path(".");
     int inputDepth = inputPath.depth();
@@ -177,9 +183,16 @@ public class RCFileMergeMapper extends M
       inputDepth--;
       inputPath = inputPath.getParent();
     }
-    updatePaths(new Path(tmpPath, newPath));
+
+    Path newTmpPath = new Path(tmpPath, newPath);
+    Path newTaskTmpPath = new Path(taskTmpPath, newPath);
+    if (!fs.exists(newTmpPath)) {
+      fs.mkdirs(newTmpPath);
+    }
+    updatePaths(newTmpPath, newTaskTmpPath);
   }
 
+  @Override
   public void close() throws IOException {
     // close writer
     if (outWriter == null) {