You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/10/16 21:04:23 UTC

svn commit: r826039 - in /hadoop/hive/branches/branch-0.4: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java

Author: zshao
Date: Fri Oct 16 19:04:23 2009
New Revision: 826039

URL: http://svn.apache.org/viewvc?rev=826039&view=rev
Log:
HIVE-882. Create a new directory every time for scratch. (Namit Jain via zshao)

Modified:
    hadoop/hive/branches/branch-0.4/CHANGES.txt
    hadoop/hive/branches/branch-0.4/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java

Modified: hadoop/hive/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.4/CHANGES.txt?rev=826039&r1=826038&r2=826039&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.4/CHANGES.txt (original)
+++ hadoop/hive/branches/branch-0.4/CHANGES.txt Fri Oct 16 19:04:23 2009
@@ -570,6 +570,9 @@
     HIVE-878. Update the hash table entry before flushing in Group By
     hash aggregation (Zheng Shao via namit)
 
+    HIVE-882. Create a new directory every time for scratch.
+    (Namit Jain via zshao)
+
 Release 0.3.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/branches/branch-0.4/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.4/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=826039&r1=826038&r2=826039&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.4/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/branches/branch-0.4/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Fri Oct 16 19:04:23 2009
@@ -140,10 +140,10 @@
   /**
    * In Hive, when the user control-c's the command line, any running jobs
    * spawned from that command line are best-effort killed.
-   * 
+   *
    * This static constructor registers a shutdown thread to iterate over all the
    * running job kill URLs and do a get on them.
-   * 
+   *
    */
   static {
     if (new org.apache.hadoop.conf.Configuration().getBoolean(
@@ -245,13 +245,13 @@
     int maxReducers = hive.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
     long totalInputFileSize = getTotalInputFileSize(job, work);
 
-    LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers 
+    LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers
         + " totalInputFileSize=" + totalInputFileSize);
 
     int reducers = (int)((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
     reducers = Math.max(1, reducers);
     reducers = Math.min(maxReducers, reducers);
-    return reducers;    
+    return reducers;
   }
 
   /**
@@ -260,13 +260,13 @@
   protected void setNumberOfReducers() throws IOException {
     // this is a temporary hack to fix things that are not fixed in the compiler
     Integer numReducersFromWork = work.getNumReduceTasks();
-    
+
     if(work.getReducer() == null) {
       console.printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
       work.setNumReduceTasks(Integer.valueOf(0));
     } else {
       if (numReducersFromWork >= 0) {
-        console.printInfo("Number of reduce tasks determined at compile time: " + work.getNumReduceTasks()); 
+        console.printInfo("Number of reduce tasks determined at compile time: " + work.getNumReduceTasks());
       } else if (job.getNumReduceTasks() > 0) {
         int reducers = job.getNumReduceTasks();
         work.setNumReduceTasks(reducers);
@@ -290,7 +290,7 @@
    * Calculate the total size of input files.
    * @param job the hadoop job conf.
    * @return the total size in bytes.
-   * @throws IOException 
+   * @throws IOException
    */
   public long getTotalInputFileSize(JobConf job, mapredWork work) throws IOException {
     long r = 0;
@@ -330,7 +330,28 @@
 
 
     String hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR);
-    Path jobScratchDir = new Path(hiveScratchDir + Utilities.randGen.nextInt());
+    String jobScratchDirStr = hiveScratchDir + File.separator + Utilities.randGen.nextInt();
+    Path   jobScratchDir = new Path(jobScratchDirStr);
+    String emptyScratchDirStr = null;
+    Path   emptyScratchDir    = null;
+
+    int numTries = 3;
+    while (numTries > 0) {
+      emptyScratchDirStr = hiveScratchDir + File.separator + Utilities.randGen.nextInt();
+      emptyScratchDir = new Path(emptyScratchDirStr);
+
+      try {
+        FileSystem fs = emptyScratchDir.getFileSystem(job);
+        fs.mkdirs(emptyScratchDir);
+        break;
+      } catch (Exception e) {
+        if (numTries > 0)
+          numTries--;
+        else
+          throw new RuntimeException("Failed to make dir " + emptyScratchDir.toString() + " : " + e.getMessage());
+      }
+    }
+
     FileOutputFormat.setOutputPath(job, jobScratchDir);
     job.setMapperClass(ExecMapper.class);
 
@@ -350,7 +371,7 @@
     String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS);
     String addedJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDJARS);
     if (StringUtils.isNotBlank(auxJars) || StringUtils.isNotBlank(addedJars)) {
-      String allJars = 
+      String allJars =
         StringUtils.isNotBlank(auxJars)
         ? (StringUtils.isNotBlank(addedJars) ? addedJars + "," + auxJars : auxJars)
         : addedJars;
@@ -363,13 +384,13 @@
     if (StringUtils.isNotBlank(addedFiles)) {
       initializeFiles("tmpfiles", addedFiles);
     }
-    
+
     int returnVal = 0;
     RunningJob rj = null, orig_rj = null;
     boolean success = false;
 
     try {
-      addInputPaths(job, work, hiveScratchDir);
+      addInputPaths(job, work, emptyScratchDirStr);
       Utilities.setMapRedWork(job, work);
 
       // remove the pwd from conf file so that job tracker doesn't show this logs
@@ -430,6 +451,7 @@
       try {
         FileSystem fs = jobScratchDir.getFileSystem(job);
         fs.delete(jobScratchDir, true);
+        fs.delete(emptyScratchDir, true);
         if (returnVal != 0 && rj != null) {
           rj.killJob();
         }
@@ -503,7 +525,7 @@
       BasicConfigurator.resetConfiguration();
       BasicConfigurator.configure(new NullAppender());
     }
-    
+
     if (planFileName == null) {
       System.err.println("Must specify Plan File Name");
       printUsage();
@@ -656,23 +678,23 @@
    * Handle a empty/null path for a given alias
    */
   private int addInputPath(String path, JobConf job, mapredWork work, String hiveScratchDir, int numEmptyPaths, boolean isEmptyPath,
-                           String alias) 
+                           String alias)
     throws Exception {
     // either the directory does not exist or it is empty
     assert path == null || isEmptyPath;
 
     // The input file does not exist, replace it by a empty file
     Class<? extends HiveOutputFormat> outFileFormat = null;
- 
-    if (isEmptyPath) 
+
+    if (isEmptyPath)
       outFileFormat = work.getPathToPartitionInfo().get(path).getTableDesc().getOutputFileFormatClass();
     else
       outFileFormat = (Class<? extends HiveOutputFormat>)(HiveSequenceFileOutputFormat.class);
-    
+
     String newFile = hiveScratchDir + File.separator + (++numEmptyPaths);
     Path newPath = new Path(newFile);
     LOG.info("Changed input file to " + newPath.toString());
-    
+
     // toggle the work
     LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
     if (isEmptyPath) {
@@ -688,7 +710,7 @@
     }
 
     work.setPathToAliases(pathToAliases);
-    
+
     LinkedHashMap<String,partitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
     if (isEmptyPath) {
       pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path));
@@ -701,7 +723,7 @@
       pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
     }
     work.setPathToPartitionInfo(pathToPartitionInfo);
-    
+
     String onefile = newPath.toString();
     RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newPath, Text.class, false, new Properties(), null);
     recWriter.close(false);
@@ -719,13 +741,13 @@
       LOG.info("Processing alias " + oneAlias);
       List<String> emptyPaths     = new ArrayList<String>();
 
-      // The alias may not have any path 
+      // The alias may not have any path
       String path = null;
       for (String onefile : work.getPathToAliases().keySet()) {
         List<String> aliases = work.getPathToAliases().get(onefile);
         if (aliases.contains(oneAlias)) {
           path = onefile;
-      
+
           // Multiple aliases can point to the same path - it should be processed only once
           if (pathsProcessed.contains(path))
             continue;