You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2013/06/04 01:41:58 UTC

svn commit: r1489226 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: Context.java exec/TaskRunner.java

Author: navis
Date: Mon Jun  3 23:41:58 2013
New Revision: 1489226

URL: http://svn.apache.org/r1489226
Log:
HIVE-4620 MR temp directory conflicts in case of parallel execution mode (Prasad Mujumdar via Navis)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1489226&r1=1489225&r2=1489226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java Mon Jun  3 23:41:58 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
@@ -171,10 +172,11 @@ public class Context {
                                boolean mkdir, String scratchDir) {
 
     String fileSystem =  scheme + ":" + authority;
-    String dir = fsScratchDirs.get(fileSystem);
+    String dir = fsScratchDirs.get(fileSystem + "-" + TaskRunner.getTaskRunnerID());
 
     if (dir == null) {
-      Path dirPath = new Path(scheme, authority, scratchDir);
+      Path dirPath = new Path(scheme, authority,
+          scratchDir + "-" + TaskRunner.getTaskRunnerID());
       if (mkdir) {
         try {
           FileSystem fs = dirPath.getFileSystem(conf);
@@ -191,7 +193,7 @@ public class Context {
         }
       }
       dir = dirPath.toString();
-      fsScratchDirs.put(fileSystem, dir);
+      fsScratchDirs.put(fileSystem + "-" + TaskRunner.getTaskRunnerID(), dir);
 
     }
     return dir;
@@ -228,9 +230,10 @@ public class Context {
     try {
       Path dir = FileUtils.makeQualified(nonLocalScratchPath, conf);
       URI uri = dir.toUri();
-      return getScratchDir(uri.getScheme(), uri.getAuthority(),
+      String newScratchDir = getScratchDir(uri.getScheme(), uri.getAuthority(),
                            !explain, uri.getPath());
-
+      LOG.info("New scratch dir is " + newScratchDir);
+      return newScratchDir;
     } catch (IOException e) {
       throw new RuntimeException(e);
     } catch (IllegalArgumentException e) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java?rev=1489226&r1=1489225&r2=1489226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java Mon Jun  3 23:41:58 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hive.ql.session.SessionState;
 
@@ -30,6 +31,13 @@ public class TaskRunner extends Thread {
   protected Task<? extends Serializable> tsk;
   protected TaskResult result;
   protected SessionState ss;
+  private static AtomicLong taskCounter = new AtomicLong(0);
+  private static ThreadLocal<Long> taskRunnerID = new ThreadLocal<Long>() {
+    @Override
+    protected Long initialValue() {
+      return taskCounter.incrementAndGet();
+    }
+  };
 
   public TaskRunner(Task<? extends Serializable> tsk, TaskResult result) {
     this.tsk = tsk;
@@ -61,4 +69,7 @@ public class TaskRunner extends Thread {
     result.setExitVal(exitVal);
   }
 
+  public static long getTaskRunnerID () {
+    return taskRunnerID.get();
+  }
 }