You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/05/19 05:51:48 UTC

git commit: TEZ-134. Untangle MR configuration in YarnTezDagChild. (sseth)

Updated Branches:
  refs/heads/TEZ-1 0d5027d00 -> be6d4bc09


TEZ-134. Untangle MR configuration in YarnTezDagChild. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/be6d4bc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/be6d4bc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/be6d4bc0

Branch: refs/heads/TEZ-1
Commit: be6d4bc096d9bae363cbcf1a3dc5ec67b057507d
Parents: 0d5027d
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat May 18 20:51:06 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat May 18 20:51:06 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/common/TezJobConfig.java   |   19 +-
 .../org/apache/hadoop/mapred/YarnTezDagChild.java  |  291 ++++-----------
 .../main/java/org/apache/tez/engine/api/Task.java  |    2 +
 .../apache/tez/common/TezEngineTaskContext.java    |    5 +
 .../tez/engine/common/shuffle/impl/Shuffle.java    |    2 +-
 .../tez/engine/common/sort/impl/TezMerger.java     |    2 +-
 .../task/local/output/TezLocalTaskOutputFiles.java |    4 +-
 .../task/local/output/TezTaskOutputFiles.java      |    2 +-
 .../apache/tez/engine/runtime/RuntimeUtils.java    |   22 +-
 .../org/apache/tez/engine/task/RuntimeTask.java    |   21 +-
 .../org/apache/hadoop/mapred/LocalJobRunner.java   |    8 +-
 .../apache/tez/mapreduce/task/MRRuntimeTask.java   |  262 +++++++++++++
 .../apache/tez/mapreduce/processor/MapUtils.java   |   50 +++
 .../mapreduce/processor/map/TestMapProcessor.java  |   23 +-
 .../processor/reduce/TestReduceProcessor.java      |   23 +-
 15 files changed, 477 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index 4a7abea..0d6f6be 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -62,11 +62,16 @@ public class TezJobConfig {
   public static final long DEFAULT_RECORDS_BEFORE_PROGRESS = 10000; 
 
   /**
-   * 
+   * List of directories avialble to the engine. 
    */
-  public static final String LOCAL_DIR = "tez.engine.local.dir";
-  public static final String DEFAULT_LOCAL_DIR = "/tmp";
-
+  public static final String LOCAL_DIRS = "tez.engine.local.dirs";
+  public static final String DEFAULT_LOCAL_DIRS = "/tmp";
+  
+  /**
+   * One local dir for the speicfic job.
+   */
+  public static final String JOB_LOCAL_DIR = "tez.engine.job.local.dir";
+  
   /**
    * The directory which contains the localized files for this task.
    */
@@ -74,6 +79,8 @@ public class TezJobConfig {
   public static final String TASK_LOCAL_RESOURCE_DIR = "tez.engine.task-local-resource.dir";
   public static final String DEFAULT_TASK_LOCAL_RESOURCE_DIR = "/tmp";
   
+  public static final String TEZ_TASK_WORKING_DIR = "tez.engine.task.working.dir";
+  
   /**
    * 
    */
@@ -299,4 +306,8 @@ public class TezJobConfig {
    * credentials.
    */
   public static final String DAG_CREDENTIALS_BINARY =  "tez.dag.credentials.binary";
+  
+  
+  public static final String APPLICATION_ATTEMPT_ID =
+      "tez.job.application.attempt.id";
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 2295b6a..6fd6eff 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -20,17 +20,9 @@ package org.apache.hadoop.mapred;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
 import java.net.InetSocketAddress;
-import java.net.URI;
 import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.crypto.SecretKey;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,16 +31,8 @@ import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
@@ -72,16 +56,15 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.security.TokenCache;
 import org.apache.tez.engine.runtime.RuntimeUtils;
-import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.engine.task.RuntimeTask;
 import org.apache.tez.mapreduce.input.SimpleInput;
 import org.apache.tez.mapreduce.output.SimpleOutput;
-import org.apache.tez.mapreduce.processor.MRTask;
 
 /**
- * The main() for TEZ MapReduce task processes.
+ * The main() for TEZ Task processes.
  */
 public class YarnTezDagChild {
 
@@ -91,11 +74,9 @@ public class YarnTezDagChild {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     LOG.debug("Child starting");
 
-    DeprecatedKeys.init();
-    
-    final JobConf defaultConf = new JobConf();
-    // HACK Eventually load the DagConf for security etc setup.
-//    defaultConf.addResource(MRJobConfig.JOB_CONF_FILE);
+    final Configuration defaultConf = new Configuration();
+    // Security settings will be loaded based on core-site and core-default. Don't
+    // depend on the jobConf for this.
     UserGroupInformation.setConfiguration(defaultConf);
 
     String host = args[0];
@@ -121,9 +102,10 @@ public class YarnTezDagChild {
     // Create TaskUmbilicalProtocol as actual task owner.
     UserGroupInformation taskOwner =
       UserGroupInformation.createRemoteUser(appID.toString());
-    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
-    SecurityUtil.setTokenService(jt, address);
-    taskOwner.addToken(jt);
+
+    Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials);
+    SecurityUtil.setTokenService(jobToken, address);
+    taskOwner.addToken(jobToken);
     final TezTaskUmbilicalProtocol umbilical =
       taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
       @Override
@@ -142,9 +124,8 @@ public class YarnTezDagChild {
     ContainerTask containerTask = null;
     UserGroupInformation childUGI = null;
     TezTaskAttemptID taskAttemptId = null;
-    MRTask task = null;
     ContainerContext containerContext = new ContainerContext(containerId, pid);
-    
+
     try {
       while (true) {
         // poll for new task
@@ -165,19 +146,19 @@ public class YarnTezDagChild {
         }
         taskContext = (TezEngineTaskContext) containerTask
             .getTezEngineTaskContext();
-        LOG.info("XXXX: New container task context:"
+        LOG.info("DEBUG: New container task context:"
                 + taskContext.toString());
 
         taskAttemptId = taskContext.getTaskAttemptId();
 
         final Task t = createAndConfigureTezTask(taskContext, umbilical,
-            credentials, jt,
+            credentials, jobToken,
             containerId.getApplicationAttemptId().getAttemptId());
-        task = (MRTask) t.getProcessor();
-        final JobConf job = task.getConf();
 
-        // Initiate Java VM metrics
-        JvmMetrics.initSingleton(containerId.toString(), job.getSessionId());
+        final Configuration conf = ((RuntimeTask)t).getConfiguration();
+
+        // TODO Initiate Java VM metrics
+        // JvmMetrics.initSingleton(containerId.toString(), job.getSessionId());
         childUGI = UserGroupInformation.createRemoteUser(System
             .getenv(ApplicationConstants.Environment.USER.toString()));
         // Add tokens to new user so that it may execute its task correctly.
@@ -186,7 +167,7 @@ public class YarnTezDagChild {
         childUGI.doAs(new PrivilegedExceptionAction<Object>() {
           @Override
           public Object run() throws Exception {
-            runTezTask(t, umbilical, job); // run the task
+            runTezTask(t, umbilical, conf); // run the task
             return null;
           }
         });
@@ -196,34 +177,6 @@ public class YarnTezDagChild {
     } catch (FSError e) {
       LOG.fatal("FSError from child", e);
       umbilical.fsError(taskAttemptId, e.getMessage());
-    } catch (Exception exception) {
-      LOG.warn("Exception running child : "
-          + StringUtils.stringifyException(exception));
-      try {
-        if (task != null) {
-          // do cleanup for the task
-          if (childUGI == null) { // no need to job into doAs block
-            task.taskCleanup(umbilical);
-          } else {
-            final MRTask taskFinal = task;
-            childUGI.doAs(new PrivilegedExceptionAction<Object>() {
-              @Override
-              public Object run() throws Exception {
-                taskFinal.taskCleanup(umbilical);
-                return null;
-              }
-            });
-          }
-        }
-      } catch (Exception e) {
-        LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e));
-      }
-      // Report back any failures, for diagnostic purposes
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      exception.printStackTrace(new PrintStream(baos));
-      if (taskAttemptId != null) {
-        umbilical.fatalError(taskAttemptId, baos.toString());
-      }
     } catch (Throwable throwable) {
       LOG.fatal("Error running child : "
     	        + StringUtils.stringifyException(throwable));
@@ -249,29 +202,36 @@ public class YarnTezDagChild {
    * out an output directory.
    * @throws IOException 
    */
-  private static void configureLocalDirs(JobConf job) throws IOException {
+  /**
+   * Configure tez-local-dirs, tez-localized-file-dir, etc. Also create these
+   * dirs.
+   */
+  
+  private static void configureLocalDirs(Configuration conf) throws IOException {
     String[] localSysDirs = StringUtils.getTrimmedStrings(
         System.getenv(Environment.LOCAL_DIRS.name()));
-    job.setStrings(TezJobConfig.LOCAL_DIR, localSysDirs);
-    job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
+    conf.setStrings(TezJobConfig.LOCAL_DIRS, localSysDirs);
+    conf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
         System.getenv(Environment.PWD.name()));
-    LOG.info(TezJobConfig.LOCAL_DIR + " for child: " +
-        job.get(TezJobConfig.LOCAL_DIR));
+    
+    LOG.info(TezJobConfig.LOCAL_DIRS + " for child: " +
+        conf.get(TezJobConfig.LOCAL_DIRS));
     LOG.info(TezJobConfig.TASK_LOCAL_RESOURCE_DIR + " for child: "
-        + job.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR));
-    LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+        + conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR));
+    
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
     Path workDir = null;
     // First, try to find the JOB_LOCAL_DIR on this host.
     try {
-      workDir = lDirAlloc.getLocalPathToRead("work", job);
+      workDir = lDirAlloc.getLocalPathToRead("work", conf);
     } catch (DiskErrorException e) {
       // DiskErrorException means dir not found. If not found, it will
       // be created below.
     }
     if (workDir == null) {
       // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
-      workDir = lDirAlloc.getLocalPathForWrite("work", job);
-      FileSystem lfs = FileSystem.getLocal(job).getRaw();
+      workDir = lDirAlloc.getLocalPathForWrite("work", conf);
+      FileSystem lfs = FileSystem.getLocal(conf).getRaw();
       boolean madeDir = false;
       try {
         madeDir = lfs.mkdirs(workDir);
@@ -281,155 +241,29 @@ public class YarnTezDagChild {
         // at the same time. If this task loses the race, it's okay because
         // the directory already exists.
         madeDir = true;
-        workDir = lDirAlloc.getLocalPathToRead("work", job);
+        workDir = lDirAlloc.getLocalPathToRead("work", conf);
       }
       if (!madeDir) {
           throw new IOException("Mkdirs failed to create "
               + workDir.toString());
       }
     }
-    // TODO TEZ This likely needs fixing to make sure things work when there are multiple local-dirs etc.
-    job.set(MRJobConfig.JOB_LOCAL_DIR, workDir.toString());
-  }
-
-  private static JobConf configureTask(MRTask task, Credentials credentials,
-      Token<JobTokenIdentifier> jt, int appAttemptId)
-          throws IOException, InterruptedException {
-    JobConf job = task.getConf();
-    
-    // Set it in conf, so as to be able to be used the the OutputCommitter.
-    job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
-
-    // set tcp nodelay
-    job.setBoolean("ipc.client.tcpnodelay", true);
-    job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
-        YarnOutputFiles.class, MapOutputFile.class);
-    // set the jobTokenFile into task
-    SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword());
-
-    task.setJobTokenSecret(sk);
-//    task.setJobTokenSecret(
-//        JobTokenSecretManager.createSecretKey(jt.getPassword()));
-
-    // setup the child's MRConfig.LOCAL_DIR.
-    configureLocalDirs(job);
-
-    // setup the child's attempt directories
-    // Do the task-type specific localization
-    task.localizeConfiguration(job);
-
-    // Set up the DistributedCache related configs
-    setupDistributedCacheConfig(job);
-
-    // Overwrite the localized task jobconf which is linked to in the current
-    // work-dir.
-    Path localTaskFile = new Path(MRJobConfig.JOB_CONF_FILE);
-    writeLocalJobFile(localTaskFile, job);
-    task.setConf(job);
-    return job;
-  }
-
-  /**
-   * Set up the DistributedCache related configs to make
-   * {@link DistributedCache#getLocalCacheFiles(Configuration)}
-   * and
-   * {@link DistributedCache#getLocalCacheArchives(Configuration)}
-   * working.
-   * @param job
-   * @throws IOException
-   */
-  private static void setupDistributedCacheConfig(final JobConf job)
-      throws IOException {
-
-    String localWorkDir = System.getenv("PWD");
-    //        ^ ^ all symlinks are created in the current work-dir
-
-    // Update the configuration object with localized archives.
-    URI[] cacheArchives = DistributedCache.getCacheArchives(job);
-    if (cacheArchives != null) {
-      List<String> localArchives = new ArrayList<String>();
-      for (int i = 0; i < cacheArchives.length; ++i) {
-        URI u = cacheArchives[i];
-        Path p = new Path(u);
-        Path name =
-            new Path((null == u.getFragment()) ? p.getName()
-                : u.getFragment());
-        String linkName = name.toUri().getPath();
-        localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
-      }
-      if (!localArchives.isEmpty()) {
-        job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
-            .arrayToString(localArchives.toArray(new String[localArchives
-                .size()])));
-      }
-    }
-
-    // Update the configuration object with localized files.
-    URI[] cacheFiles = DistributedCache.getCacheFiles(job);
-    if (cacheFiles != null) {
-      List<String> localFiles = new ArrayList<String>();
-      for (int i = 0; i < cacheFiles.length; ++i) {
-        URI u = cacheFiles[i];
-        Path p = new Path(u);
-        Path name =
-            new Path((null == u.getFragment()) ? p.getName()
-                : u.getFragment());
-        String linkName = name.toUri().getPath();
-        localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
-      }
-      if (!localFiles.isEmpty()) {
-        job.set(MRJobConfig.CACHE_LOCALFILES,
-            StringUtils.arrayToString(localFiles
-                .toArray(new String[localFiles.size()])));
-      }
-    }
-  }
-
-  private static final FsPermission urw_gr =
-    FsPermission.createImmutable((short) 0640);
-
-  /**
-   * Write the task specific job-configuration file.
-   * @throws IOException
-   */
-  private static void writeLocalJobFile(Path jobFile, JobConf conf)
-      throws IOException {
-    FileSystem localFs = FileSystem.getLocal(conf);
-    localFs.delete(jobFile);
-    OutputStream out = null;
-    try {
-      out = FileSystem.create(localFs, jobFile, urw_gr);
-      conf.writeXml(out);
-    } finally {
-      IOUtils.cleanup(LOG, out);
-    }
+    conf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString());
   }
-
+  
   private static Task createAndConfigureTezTask(
-      TezEngineTaskContext taskContext,
-      TezTaskUmbilicalProtocol master,
-      Credentials credentials, Token<JobTokenIdentifier> jt,
-      int appAttemptId)
-      throws IOException, InterruptedException {
+      TezEngineTaskContext taskContext, TezTaskUmbilicalProtocol master,
+      Credentials cxredentials, Token<JobTokenIdentifier> jobToken,
+      int appAttemptId) throws IOException, InterruptedException {
 
-    Configuration jConf = new JobConf(MRJobConfig.JOB_CONF_FILE);
-    Configuration conf = MultiStageMRConfigUtil.getConfForVertex(jConf,
-        taskContext.getVertexName());
+    Configuration conf = new Configuration();
+    // set tcp nodelay
+    conf.setBoolean("ipc.client.tcpnodelay", true);
+    conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
+    
+    configureLocalDirs(conf);
+    
     
-    // TOOD Post MRR
-    // A single file per vertex will likely be a better solution. Does not
-    // require translation - client can take care of this. Will work independent
-    // of whether the configuration is for intermediate tasks or not. Has the
-    // overhead of localizing multiple files per job - i.e. the client would
-    // need to write these files to hdfs, add them as local resources per
-    // vertex. A solution like this may be more practical once it's possible to
-    // submit configuration parameters to the AM and effectively tasks via RPC.
-
-    // TODO Avoid all this extra config manipulation.
-    // FIXME we need I/O/p level configs to be used in init below
-    final JobConf job = new JobConf(conf);
-    job.setCredentials(credentials);
-
     // FIXME need Input/Output vertices else we have this hack
     if (taskContext.getInputSpecList().isEmpty()) {
       taskContext.getInputSpecList().add(
@@ -440,26 +274,37 @@ public class YarnTezDagChild {
           new OutputSpec("null", 0, SimpleOutput.class.getName()));
     }
     Task t = RuntimeUtils.createRuntimeTask(taskContext);
-    
+    t.initialize(conf, master);
+
     // FIXME wrapper should initialize all of processor, inputs and outputs
     // Currently, processor is inited via task init
     // and processor then inits inputs and outputs
-    t.initialize(job, master);
-    
-    MRTask task = (MRTask)t.getProcessor();
-    configureTask(task, credentials, jt, appAttemptId);
-    
     return t;
   }
   
   private static void runTezTask(
-      Task t, TezTaskUmbilicalProtocol master, JobConf job) 
+      Task t, TezTaskUmbilicalProtocol master, Configuration conf) 
   throws IOException, InterruptedException {
     // use job-specified working directory
-    FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
+    FileSystem.get(conf).setWorkingDirectory(getWorkingDirectory(conf));
     
     // Run!
     t.run();
     t.close();
   }
+  
+  private static Path getWorkingDirectory(Configuration conf) {
+    String name = conf.get(JobContext.WORKING_DIR);
+    if (name != null) {
+      return new Path(name);
+    } else {
+      try {
+        Path dir = FileSystem.get(conf).getWorkingDirectory();
+        conf.set(JobContext.WORKING_DIR, dir.toString());
+        return dir;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
index ea50a06..c08eda6 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
@@ -74,4 +74,6 @@ public interface Task {
    */
   public void close() throws IOException, InterruptedException;
   
+  
+  public Configuration getConfiguration();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
index 5823de6..93aeb0b 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
@@ -55,6 +55,11 @@ public class TezEngineTaskContext extends TezTaskContext {
     this.processorName = processorName;
   }
 
+  public String getRuntimeName() {
+    // FIXME. Add this to the DAG configuration, and fetch from there.
+    return "org.apache.tez.mapreduce.task.MRRuntimeTask";
+  }
+
   public String getProcessorName() {
     return processorName;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
index 6a065de..202abd6 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
@@ -80,7 +80,7 @@ public class Shuffle implements ExceptionReporter {
     
     FileSystem localFS = FileSystem.getLocal(this.conf);
     LocalDirAllocator localDirAllocator = 
-        new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+        new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
     
     copyPhase = this.runningTaskContext.getProgress().addPhase("copy", 0.33f);
     mergePhase = this.runningTaskContext.getProgress().addPhase("merge", 0.66f);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
index 829495b..eeea764 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
@@ -57,7 +57,7 @@ public class TezMerger {
 
   // Local directories
   private static LocalDirAllocator lDirAlloc = 
-    new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+    new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
 
   public static
   TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
index ccfbd78..69484af 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
@@ -42,7 +42,7 @@ import org.apache.tez.dag.records.TezTaskID;
 public class TezLocalTaskOutputFiles extends TezTaskOutput {
 
   private LocalDirAllocator lDirAlloc =
-    new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+    new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
 
   public TezLocalTaskOutputFiles() {
   }
@@ -223,7 +223,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
   }
 
   private String[] getLocalDirs() throws IOException {
-    return getConf().getStrings(TezJobConfig.LOCAL_DIR);
+    return getConf().getStrings(TezJobConfig.LOCAL_DIRS);
   }
 
   @SuppressWarnings("deprecation")

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
index 1bd65e2..5fb6519 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
@@ -56,7 +56,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
 
   // assume configured to $localdir/usercache/$user/appcache/$appId
   private LocalDirAllocator lDirAlloc = 
-    new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+    new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
 
   private Path getAttemptOutputDir() {
     LOG.info("DEBUG: getAttemptOutputDir: "

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
index e04d405..900c2f0 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
@@ -66,6 +66,7 @@ public class RuntimeUtils {
         + ", Processor: " + taskContext.getProcessorName()
         + ", InputCount=" + taskContext.getInputSpecList().size()
         + ", OutputCount=" + taskContext.getOutputSpecList().size());
+
     RuntimeTask t = null;
     try {
       Class<?> processorClazz =
@@ -108,11 +109,30 @@ public class RuntimeUtils {
           outputs[i] = output;
         }
       }
-      t = new RuntimeTask(processor, inputs, outputs);
+      // t = new RuntimeTask(taskContext, processor, inputs, outputs);
+      t = createRuntime(taskContext, processor, inputs, outputs);
     } catch (ClassNotFoundException e) {
       throw new YarnException("Unable to initialize RuntimeTask, context="
           + taskContext, e);
     }
     return t;
   }
+  
+  private static RuntimeTask createRuntime(TezEngineTaskContext taskContext,
+      Processor processor, Input[] inputs, Output[] outputs) {
+    try {
+      // TODO Change this to use getNewInstance
+      Class<?> runtimeClazz = Class.forName(taskContext.getRuntimeName());
+      Constructor<?> ctor = runtimeClazz.getConstructor(
+          TezEngineTaskContext.class, Processor.class, Input[].class,
+          Output[].class);
+      ctor.setAccessible(true);
+      return (RuntimeTask) ctor.newInstance(taskContext, processor, inputs, outputs);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("Unable to load runtimeClass: "
+          + taskContext.getRuntimeName(), e);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
index 3e2c6f2..871f3ba 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
@@ -20,6 +20,7 @@ package org.apache.tez.engine.task;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.engine.api.Input;
 import org.apache.tez.engine.api.Master;
 import org.apache.tez.engine.api.Output;
@@ -28,17 +29,19 @@ import org.apache.tez.engine.api.Task;
 
 public class RuntimeTask implements Task {
 
-  private final Input[] inputs;
-  private final Output[] outputs;
-  private final Processor processor;
+  protected final Input[] inputs;
+  protected final Output[] outputs;
+  protected final Processor processor;
   
-  private Configuration conf;
-  private Master master;
+  protected TezEngineTaskContext taskContext;
+  protected Configuration conf;
+  protected Master master;
   
-  public RuntimeTask(
+  public RuntimeTask(TezEngineTaskContext taskContext,
       Processor processor,
       Input[] inputs,
       Output[] outputs) {
+    this.taskContext = taskContext;
     this.inputs = inputs;
     this.processor = processor;
     this.outputs = outputs;
@@ -74,7 +77,13 @@ public class RuntimeTask implements Task {
 
   public void close() throws IOException, InterruptedException {
     // NOTE: Allow processor to close input/output
+    // This can be changed to close input/output since MRRuntimeTask is used for
+    // MR jobs, which changes the order.
     processor.close();
   }
 
+  @Override
+  public Configuration getConfiguration() {
+    return this.conf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 63472d2..5c181e2 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -465,7 +465,7 @@ public class LocalJobRunner implements ClientProtocol {
                 LOG.info("XXX mapId: " + i + 
                     " LOCAL_DIR = " + 
                     mapOutputFiles.get(mapId).getConf().get(
-                        TezJobConfig.LOCAL_DIR));
+                        TezJobConfig.LOCAL_DIRS));
                 Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
                 TezTaskOutput localOutputFile = new TezLocalTaskOutputFiles();
                 localOutputFile.setConf(localConf);
@@ -902,7 +902,7 @@ public class LocalJobRunner implements ClientProtocol {
       TaskAttemptID taskAttemptID, String user, JobConf conf) {
     String[] localDirs = 
         conf.getTrimmedStrings(
-            TezJobConfig.LOCAL_DIR, TezJobConfig.DEFAULT_LOCAL_DIR);
+            TezJobConfig.LOCAL_DIRS, TezJobConfig.DEFAULT_LOCAL_DIRS);
     String jobId = taskAttemptID.getJobID().toString();
     String taskId = taskAttemptID.getTaskID().toString();
     boolean isCleanup = false;
@@ -913,9 +913,9 @@ public class LocalJobRunner implements ClientProtocol {
       childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
           + getLocalTaskDir(user, jobId, taskId, isCleanup));
     }
-    LOG.info(TezJobConfig.LOCAL_DIR + " for child : " + taskAttemptID + 
+    LOG.info(TezJobConfig.LOCAL_DIRS + " for child : " + taskAttemptID + 
         " is " + childMapredLocalDir);
-    conf.set(TezJobConfig.LOCAL_DIR, childMapredLocalDir.toString());
+    conf.set(TezJobConfig.LOCAL_DIRS, childMapredLocalDir.toString());
     conf.setClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER, 
         TezLocalTaskOutputFiles.class, TezTaskOutput.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
new file mode 100644
index 0000000..6091d4b
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.task;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.security.TokenCache;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.task.impl.YarnOutputFiles;
+
+@SuppressWarnings("deprecation")
+public class MRRuntimeTask extends RuntimeTask {
+
+  private static final Log LOG = LogFactory.getLog(MRRuntimeTask.class);
+
+  private MRTask mrTask;
+
+  public MRRuntimeTask(TezEngineTaskContext taskContext, Processor processor,
+      Input[] inputs, Output[] outputs) {
+    super(taskContext, processor, inputs, outputs);
+  }
+
+  @Override
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+
+    DeprecatedKeys.init();
+
+    Configuration mrConf = new Configuration(conf);
+    mrConf.addResource(MRJobConfig.JOB_CONF_FILE);
+    Configuration taskConf = MultiStageMRConfigUtil.getConfForVertex(mrConf,
+        taskContext.getVertexName());
+
+    // TODO Avoid all this extra config manipulation.
+    // FIXME we need I/O/p level configs to be used in init below
+
+    // TOOD Post MRR
+    // A single file per vertex will likely be a better solution. Does not
+    // require translation - client can take care of this. Will work independent
+    // of whether the configuration is for intermediate tasks or not. Has the
+    // overhead of localizing multiple files per job - i.e. the client would
+    // need to write these files to hdfs, add them as local resources per
+    // vertex. A solution like this may be more practical once it's possible to
+    // submit configuration parameters to the AM and effectively tasks via RPC.
+
+    final JobConf job = new JobConf(taskConf);
+
+    MRTask mrTask = (MRTask) getProcessor();
+    this.mrTask = mrTask;
+    configureMRTask(job, mrTask);
+
+    this.conf = job;
+    this.master = master;
+
+    // NOTE: Allow processor to initialize input/output
+    processor.initialize(this.conf, this.master);
+  }
+
+  @Override
+  public void run() throws IOException, InterruptedException {
+    TezTaskUmbilicalProtocol umbilical = (TezTaskUmbilicalProtocol) master;
+    try {
+      super.run();
+    } catch (FSError e) {
+      throw e;
+    } catch (Exception exception) {
+      LOG.warn("Exception running child : "
+          + StringUtils.stringifyException(exception));
+      try {
+        if (mrTask != null) {
+          mrTask.taskCleanup(umbilical);
+        }
+      } catch (Exception e) {
+        LOG.info("Exception cleanup up: " + StringUtils.stringifyException(e));
+      }
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      exception.printStackTrace(new PrintStream(baos));
+      if (taskContext.getTaskAttemptId() != null) {
+        umbilical.fatalError(taskContext.getTaskAttemptId(), baos.toString());
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException, InterruptedException {
+    // NOTE: Allow processor to close input/output
+    processor.close();
+  }
+
+  private static void configureMRTask(JobConf job, MRTask task)
+      throws IOException, InterruptedException {
+
+    Credentials credentials = UserGroupInformation.getCurrentUser()
+        .getCredentials();
+    job.setCredentials(credentials);
+    // TODO Can this be avoided all together. Have the MRTezOutputCommitter use
+    // the Tez parameter.
+    // TODO This could be fetched from the env if YARN is setting it for all
+    // Containers.
+    // Set it in conf, so as to be able to be used the the OutputCommitter.
+    job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+        job.getInt(TezJobConfig.APPLICATION_ATTEMPT_ID, -1));
+
+    job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, YarnOutputFiles.class,
+        MapOutputFile.class); // MR
+
+    Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials);
+    if (jobToken != null) {
+      // Will MR ever run without a job token.
+      SecretKey sk = JobTokenSecretManager.createSecretKey(jobToken
+          .getPassword());
+      task.setJobTokenSecret(sk);
+    } else {
+      LOG.warn("No job token set");
+    }
+
+    job.set(MRJobConfig.JOB_LOCAL_DIR, job.get(TezJobConfig.JOB_LOCAL_DIR));
+
+    // setup the child's attempt directories
+    // Do the task-type specific localization
+    task.localizeConfiguration(job);
+
+    // Set up the DistributedCache related configs
+    setupDistributedCacheConfig(job);
+
+    // Overwrite the localized task jobconf which is linked to in the current
+    // work-dir.
+    Path localTaskFile = new Path(
+        job.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR),
+        MRJobConfig.JOB_CONF_FILE);
+    writeLocalJobFile(localTaskFile, job);
+
+    task.setConf(job);
+  }
+
+  /**
+   * Set up the DistributedCache related configs to make
+   * {@link DistributedCache#getLocalCacheFiles(Configuration)} and
+   * {@link DistributedCache#getLocalCacheArchives(Configuration)} working.
+   * 
+   * @param job
+   * @throws IOException
+   */
+  private static void setupDistributedCacheConfig(final JobConf job)
+      throws IOException {
+
+    String localWorkDir = System.getenv(System.getenv(Environment.PWD.name()));
+    // ^ ^ all symlinks are created in the current work-dir
+
+    // Update the configuration object with localized archives.
+    URI[] cacheArchives = DistributedCache.getCacheArchives(job);
+    if (cacheArchives != null) {
+      List<String> localArchives = new ArrayList<String>();
+      for (int i = 0; i < cacheArchives.length; ++i) {
+        URI u = cacheArchives[i];
+        Path p = new Path(u);
+        Path name = new Path((null == u.getFragment()) ? p.getName()
+            : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localArchives.isEmpty()) {
+        job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
+            .arrayToString(localArchives.toArray(new String[localArchives
+                .size()])));
+      }
+    }
+
+    // Update the configuration object with localized files.
+    URI[] cacheFiles = DistributedCache.getCacheFiles(job);
+    if (cacheFiles != null) {
+      List<String> localFiles = new ArrayList<String>();
+      for (int i = 0; i < cacheFiles.length; ++i) {
+        URI u = cacheFiles[i];
+        Path p = new Path(u);
+        Path name = new Path((null == u.getFragment()) ? p.getName()
+            : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localFiles.isEmpty()) {
+        job.set(MRJobConfig.CACHE_LOCALFILES, StringUtils
+            .arrayToString(localFiles.toArray(new String[localFiles.size()])));
+      }
+    }
+  }
+
+  private static final FsPermission urw_gr = FsPermission
+      .createImmutable((short) 0640);
+
+  /**
+   * Write the task specific job-configuration file.
+   * 
+   * @throws IOException
+   */
+  private static void writeLocalJobFile(Path jobFile, JobConf conf)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    localFs.delete(jobFile);
+    OutputStream out = null;
+    try {
+      out = FileSystem.create(localFs, jobFile, urw_gr);
+      conf.writeXml(out);
+    } finally {
+      IOUtils.cleanup(LOG, out);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 0b6bc5f..48a1113 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -24,14 +24,17 @@ import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
@@ -40,6 +43,8 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.tez.common.InputSpec;
 import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.TezEngineTaskContext;
@@ -55,6 +60,51 @@ public class MapUtils {
 
   private static final Log LOG = LogFactory.getLog(MapUtils.class);
   
+  public static void configureLocalDirs(Configuration conf, String localDir)
+      throws IOException {
+    String[] localSysDirs = new String[1];
+    localSysDirs[0] = localDir;
+
+    conf.setStrings(TezJobConfig.LOCAL_DIRS, localSysDirs);
+    conf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
+        System.getenv(Environment.PWD.name()));
+
+    LOG.info(TezJobConfig.LOCAL_DIRS + " for child: "
+        + conf.get(TezJobConfig.LOCAL_DIRS));
+    LOG.info(TezJobConfig.TASK_LOCAL_RESOURCE_DIR + " for child: "
+        + conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR));
+
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+    Path workDir = null;
+    // First, try to find the JOB_LOCAL_DIR on this host.
+    try {
+      workDir = lDirAlloc.getLocalPathToRead("work", conf);
+    } catch (DiskErrorException e) {
+      // DiskErrorException means dir not found. If not found, it will
+      // be created below.
+    }
+    if (workDir == null) {
+      // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
+      workDir = lDirAlloc.getLocalPathForWrite("work", conf);
+      FileSystem lfs = FileSystem.getLocal(conf).getRaw();
+      boolean madeDir = false;
+      try {
+        madeDir = lfs.mkdirs(workDir);
+      } catch (FileAlreadyExistsException e) {
+        // Since all tasks will be running in their own JVM, the race condition
+        // exists where multiple tasks could be trying to create this directory
+        // at the same time. If this task loses the race, it's okay because
+        // the directory already exists.
+        madeDir = true;
+        workDir = lDirAlloc.getLocalPathToRead("work", conf);
+      }
+      if (!madeDir) {
+        throw new IOException("Mkdirs failed to create " + workDir.toString());
+      }
+    }
+    conf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString());
+  }
+  
   private static InputSplit 
   createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file) 
       throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index cda15fb..1e5fe9b 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -55,36 +55,40 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+@SuppressWarnings("deprecation")
 public class TestMapProcessor {
   
   private static final Log LOG = LogFactory.getLog(TestMapProcessor.class);  
   
   private static JobConf defaultConf = new JobConf();
   private static FileSystem localFs = null; 
+  private static Path workDir = null;
   static {
     try {
       defaultConf.set("fs.defaultFS", "file:///");
       localFs = FileSystem.getLocal(defaultConf);
+      workDir =
+          new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+                   "TestMapProcessor").makeQualified(localFs);
+      MapUtils.configureLocalDirs(defaultConf, workDir.toString());
     } catch (IOException e) {
       throw new RuntimeException("init failure", e);
     }
   }
-  @SuppressWarnings("deprecation")
-  private static Path workDir =
-    new Path(new Path(System.getProperty("test.build.data", "/tmp")),
-             "TestMapProcessor").makeQualified(localFs);
+  
+
 
   TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
 
   public void setUpJobConf(JobConf job) {
-    job.set(TezJobConfig.LOCAL_DIR, workDir.toString());
+    job.set(TezJobConfig.LOCAL_DIRS, workDir.toString());
     job.setClass(
         Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
         TezLocalTaskOutputFiles.class, 
         TezTaskOutput.class);
     job.setNumReduceTasks(1);
   }
-  
+
   @Before
   @After
   public void cleanup() throws Exception {
@@ -98,15 +102,18 @@ public class TestMapProcessor {
     setUpJobConf(jobConf);
     TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
     mapOutputs.setConf(jobConf);
-    
+
     Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+    conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, 0);
+
     Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
         vertexName);
     
     JobConf job = new JobConf(stageConf);
-    
+
     job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
         "localized-resources").toUri().toString());
+    
 
     MapUtils.runMapProcessor(localFs, workDir, job, 0,
         new Path(workDir, "map0"), new TestUmbilicalProtocol(), vertexName,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 69571e1..d17b0be 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -56,27 +56,30 @@ import org.junit.Before;
 import org.junit.Test;
 
 
+@SuppressWarnings("deprecation")
 public class TestReduceProcessor {
   
   private static final Log LOG = LogFactory.getLog(TestReduceProcessor.class);
 
   private static JobConf defaultConf = new JobConf();
-  private static FileSystem localFs = null; 
+  private static FileSystem localFs = null;
+  private static Path workDir = null;
   static {
     try {
       defaultConf.set("fs.defaultFS", "file:///");
       localFs = FileSystem.getLocal(defaultConf);
+      workDir =
+          new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+                   "TestReduceProcessor").makeQualified(localFs);
+      
+      MapUtils.configureLocalDirs(defaultConf, workDir.toString());
     } catch (IOException e) {
       throw new RuntimeException("init failure", e);
     }
   }
-  @SuppressWarnings("deprecation")
-  private static Path workDir =
-    new Path(new Path(System.getProperty("test.build.data", "/tmp")),
-             "TestReduceProcessor").makeQualified(localFs);
 
   public void setUpJobConf(JobConf job) {
-    job.set(TezJobConfig.LOCAL_DIR, workDir.toString());
+    job.set(TezJobConfig.LOCAL_DIRS, workDir.toString());
     job.setClass(
         Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
         TezLocalTaskOutputFiles.class, 
@@ -101,6 +104,7 @@ public class TestReduceProcessor {
     mapOutputs.setConf(jobConf);
     
     Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+    conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, 0);
     Configuration mapStageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
         mapVertexName);
     
@@ -109,6 +113,7 @@ public class TestReduceProcessor {
     mapConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
         "localized-resources").toUri().toString());
     
+    
     // Run a map
     MapUtils.runMapProcessor(localFs, workDir, mapConf, 0,
         new Path(workDir, "map0"), new TestUmbilicalProtocol(), mapVertexName,
@@ -124,6 +129,8 @@ public class TestReduceProcessor {
         reduceVertexName);
     JobConf reduceConf = new JobConf(reduceStageConf);
     reduceConf.setOutputFormat(SequenceFileOutputFormat.class);
+    reduceConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+        "localized-resources").toUri().toString());
     FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output"));
     
     // Now run a reduce
@@ -147,8 +154,8 @@ public class TestReduceProcessor {
             .toMRTaskId(taskContext.getTaskAttemptId().getTaskID()));
     Path reduceOutputFile = new Path(reduceOutputDir, "part-00000");
 
-    @SuppressWarnings("deprecation")
-    SequenceFile.Reader reader = new SequenceFile.Reader(localFs, reduceOutputFile, reduceConf);
+    SequenceFile.Reader reader = new SequenceFile.Reader(localFs,
+        reduceOutputFile, reduceConf);
 
     LongWritable key = new LongWritable();
     Text value = new Text();