You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2019/05/06 17:52:38 UTC

[tez] branch master updated (0f71b0b -> 82c1d75)

This is an automated email from the ASF dual-hosted git repository.

sseth pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git.


    from 0f71b0b  TEZ-4062. Speculative attempt scheduling should be aborted when Task has completed
     new 4bbd3c2  Revert "TEZ-1348. Allow Tez local mode to run against filesystems other than"
     new 82c1d75  EZ-1348. Allow Tez local mode to run against filesystems other than local FS. (Todd Lipcon via sseth)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 tez-dag/src/main/java/org/apache/tez/client/LocalClient.java  |  2 --
 .../src/test/java/org/apache/tez/test/TestLocalMode.java      | 11 ++++++++---
 2 files changed, 8 insertions(+), 5 deletions(-)


[tez] 01/02: Revert "TEZ-1348. Allow Tez local mode to run against filesystems other than"

Posted by ss...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sseth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git

commit 4bbd3c2e7600752cfe0d074d61e00e12ea0ee748
Author: Siddharth Seth <ss...@apache.org>
AuthorDate: Mon May 6 10:51:43 2019 -0700

    Revert "TEZ-1348. Allow Tez local mode to run against filesystems other than"
    
    This reverts commit 46b4004d97dd2f2cde491a93abcdd48c9b82f68e.
---
 .../org/apache/tez/common/TezUtilsInternal.java    | 41 +++++++++-
 .../java/org/apache/tez/client/LocalClient.java    | 53 ++++++------
 .../org/apache/tez/examples/TezExampleBase.java    |  3 +-
 .../java/org/apache/tez/test/TestLocalMode.java    | 93 +++++++---------------
 4 files changed, 97 insertions(+), 93 deletions(-)

diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index adcae8a..5d7aea3 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -71,10 +71,20 @@ public class TezUtilsInternal {
 
   public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws
       IOException {
-    File confPBFile = new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME);
-    try (FileInputStream fis = new FileInputStream(confPBFile)) {
-      return ConfigurationProto.parseFrom(fis);
+    FileInputStream confPBBinaryStream = null;
+    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+    try {
+      confPBBinaryStream =
+          new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
+      confProtoBuilder.mergeFrom(confPBBinaryStream);
+    } finally {
+      if (confPBBinaryStream != null) {
+        confPBBinaryStream.close();
+      }
     }
+
+    ConfigurationProto confProto = confProtoBuilder.build();
+    return confProto;
   }
 
   public static void addUserSpecifiedTezConfiguration(Configuration conf,
@@ -85,6 +95,31 @@ public class TezUtilsInternal {
       }
     }
   }
+//
+//  public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
+//      IOException {
+//    FileInputStream confPBBinaryStream = null;
+//    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+//    try {
+//      confPBBinaryStream =
+//          new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
+//      confProtoBuilder.mergeFrom(confPBBinaryStream);
+//    } finally {
+//      if (confPBBinaryStream != null) {
+//        confPBBinaryStream.close();
+//      }
+//    }
+//
+//    ConfigurationProto confProto = confProtoBuilder.build();
+//
+//    List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
+//    if (kvPairList != null && !kvPairList.isEmpty()) {
+//      for (PlanKeyValuePair kvPair : kvPairList) {
+//        conf.set(kvPair.getKey(), kvPair.getValue());
+//      }
+//    }
+//  }
+
 
   public static byte[] compressBytes(byte[] inBytes) throws IOException {
     StopWatch sw = new StopWatch().start();
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 140ada1..6baea48 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -24,8 +24,6 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import javax.annotation.Nullable;
-
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -85,6 +83,7 @@ public class LocalClient extends FrameworkClient {
   @Override
   public void init(TezConfiguration tezConf, YarnConfiguration yarnConf) {
     this.conf = tezConf;
+    tezConf.set("fs.defaultFS", "file:///");
     // Tez libs already in the client's classpath
     this.conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
     this.conf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, localModeDAGSchedulerClassName);
@@ -287,34 +286,19 @@ public class LocalClient extends FrameworkClient {
         try {
           ApplicationId appId = appContext.getApplicationId();
 
-          // Set up working directory for DAGAppMaster.
-          // The staging directory may be on the default file system, which may or may not
-          // be the local FS. For example, when using testing Hive against a pseudo-distributed
-          // cluster, it's useful for the default FS to be HDFS. Hive then puts its scratch
-          // directories on HDFS, and sets the Tez staging directory to be the session's
-          // scratch directory.
-          //
-          // To handle this case, we need to copy over the staging data back onto the
-          // local file system, where the rest of the Tez Child code expects it.
-          //
-          // NOTE: we base the local working directory path off of the staging path, even
-          // though it might be on a different file system. Typically they're both in a
-          // path starting with /tmp, but in the future we may want to use a different
-          // temp directory locally.
+          // Set up working directory for DAGAppMaster
           Path staging = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString());
-          FileSystem stagingFs = staging.getFileSystem(conf);
-
-          FileSystem localFs = FileSystem.getLocal(conf);
-          Path userDir = localFs.makeQualified(new Path(staging.toUri().getPath() + "_wd"));
+          Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()+"_wd");
           LOG.info("Using working directory: " + userDir.toUri().getPath());
 
+          FileSystem fs = FileSystem.get(conf);
           // copy data from staging directory to working directory to simulate the resource localizing
-          FileUtil.copy(stagingFs, staging, localFs, userDir, false, conf);
+          FileUtil.copy(fs, staging, fs, userDir, false, conf);
           // Prepare Environment
           Path logDir = new Path(userDir, "localmode-log-dir");
           Path localDir = new Path(userDir, "localmode-local-dir");
-          localFs.mkdirs(logDir);
-          localFs.mkdirs(localDir);
+          fs.mkdirs(logDir);
+          fs.mkdirs(localDir);
 
           UserGroupInformation.setConfiguration(conf);
           // Add session specific credentials to the AM credentials.
@@ -373,11 +357,30 @@ public class LocalClient extends FrameworkClient {
 
     // Read in additional information about external services
     AMPluginDescriptorProto amPluginDescriptorProto =
-        TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
-            .getAmPluginDescriptor();
+        getPluginDescriptorInfo(conf, applicationAttemptId.getApplicationId().toString());
+
 
     return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
         new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs,
         versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto);
   }
+
+  private AMPluginDescriptorProto getPluginDescriptorInfo(Configuration conf,
+                                                          String applicationIdString) throws
+      IOException {
+    Path tezSysStagingPath = TezCommonUtils
+        .getTezSystemStagingPath(conf, applicationIdString);
+    // Remove the filesystem qualifier.
+    String unqualifiedPath = tezSysStagingPath.toUri().getPath();
+
+    DAGProtos.ConfigurationProto confProto =
+        TezUtilsInternal
+            .readUserSpecifiedTezConfiguration(unqualifiedPath);
+    AMPluginDescriptorProto amPluginDescriptorProto = null;
+    if (confProto.hasAmPluginDescriptor()) {
+      amPluginDescriptorProto = confProto.getAmPluginDescriptor();
+    }
+    return amPluginDescriptorProto;
+  }
+
 }
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index cb52105..6b626b1 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -276,7 +276,8 @@ public abstract class TezExampleBase extends Configured implements Tool {
 
   protected void printExtraOptionsUsage(PrintStream ps) {
     ps.println("Tez example extra options supported are");
-    ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, "
+    // TODO TEZ-1348 make it able to access dfs in tez local mode
+    ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, currently it can only access local file system in tez local mode,"
         + " run it in distributed mode without this option");
     ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput,"
         + " enable split grouping without this option.");
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index 318349c..2a5b65f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -20,16 +20,12 @@ package org.apache.tez.test;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -47,73 +43,23 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import static org.junit.Assert.*;
 
-@RunWith(Parameterized.class)
 public class TestLocalMode {
 
   private static final File TEST_DIR = new File(
       System.getProperty("test.build.data",
           System.getProperty("java.io.tmpdir")), "TestLocalMode-tez-localmode");
 
-  private static MiniDFSCluster dfsCluster;
-  private static FileSystem remoteFs;
-
-  @Parameterized.Parameter
-  public boolean useDfs;
-
-  @Parameterized.Parameters(name = "useDFS:{0}")
-  public static Collection<Object[]> params() {
-    return Arrays.asList(new Object[][]{{ false }, { true }});
-  }
-
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    try {
-      Configuration conf = new Configuration();
-      dfsCluster =
-          new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true)
-              .racks(null).build();
-      remoteFs = dfsCluster.getFileSystem();
-    } catch (IOException io) {
-      throw new RuntimeException("problem starting mini dfs cluster", io);
-    }
-  }
-
-  @AfterClass
-  public static void afterClass() throws InterruptedException {
-    if (dfsCluster != null) {
-      try {
-        dfsCluster.shutdown();
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  private TezConfiguration createConf() {
-    TezConfiguration conf = new TezConfiguration();
-    conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    if (useDfs) {
-      conf.set("fs.defaultFS", remoteFs.getUri().toString());
-    } else {
-      conf.set("fs.defaultFS", "file:///");
-    }
-    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
-    return conf;
-  }
-
   @Test(timeout = 30000)
   public void testMultipleClientsWithSession() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = createConf();
+    TezConfiguration tezConf1 = new TezConfiguration();
+    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf1.set("fs.defaultFS", "file:///");
+    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, true);
     tezClient1.start();
 
@@ -126,7 +72,11 @@ public class TestLocalMode {
     dagClient1.close();
     tezClient1.stop();
 
-    TezConfiguration tezConf2 = createConf();
+
+    TezConfiguration tezConf2 = new TezConfiguration();
+    tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf2.set("fs.defaultFS", "file:///");
+    tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
     TezClient tezClient2 = TezClient.create("commonName", tezConf2, true);
     tezClient2.start();
@@ -141,7 +91,10 @@ public class TestLocalMode {
   @Test(timeout = 10000)
   public void testMultipleClientsWithoutSession() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = createConf();
+    TezConfiguration tezConf1 = new TezConfiguration();
+    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf1.set("fs.defaultFS", "file:///");
+    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
 
@@ -155,7 +108,10 @@ public class TestLocalMode {
     tezClient1.stop();
 
 
-    TezConfiguration tezConf2 = createConf();
+    TezConfiguration tezConf2 = new TezConfiguration();
+    tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf2.set("fs.defaultFS", "file:///");
+    tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
     TezClient tezClient2 = TezClient.create("commonName", tezConf2, false);
     tezClient2.start();
@@ -170,7 +126,10 @@ public class TestLocalMode {
   @Test(timeout = 20000)
   public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = createConf();
+    TezConfiguration tezConf1 = new TezConfiguration();
+    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf1.set("fs.defaultFS", "file:///");
+    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     // Run in non-session mode so that the AM terminates
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
@@ -191,7 +150,10 @@ public class TestLocalMode {
   @Test(timeout = 20000)
   public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = createConf();
+    TezConfiguration tezConf1 = new TezConfiguration();
+    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf1.set("fs.defaultFS", "file:///");
+    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     // Run in non-session mode so that the AM terminates
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
@@ -249,7 +211,10 @@ public class TestLocalMode {
     String[] outputPaths =  new String[dags];
     DAGClient[] dagClients = new DAGClient[dags];
 
-    TezConfiguration tezConf = createConf();
+    TezConfiguration tezConf = new TezConfiguration();
+    tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf.set("fs.defaultFS", "file:///");
+    tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     TezClient tezClient = TezClient.create("testMultiDAGOnSession", tezConf, true);
     tezClient.start();
 


[tez] 02/02: EZ-1348. Allow Tez local mode to run against filesystems other than local FS. (Todd Lipcon via sseth)

Posted by ss...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sseth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git

commit 82c1d75c7b3abf7316ba265a599b910fcc2360c7
Author: Siddharth Seth <ss...@apache.org>
AuthorDate: Mon May 6 10:52:19 2019 -0700

    EZ-1348. Allow Tez local mode to run against filesystems other than
    local FS. (Todd Lipcon via sseth)
---
 .../org/apache/tez/common/TezUtilsInternal.java    | 41 +--------
 .../java/org/apache/tez/client/LocalClient.java    | 53 ++++++------
 .../org/apache/tez/examples/TezExampleBase.java    |  3 +-
 .../java/org/apache/tez/test/TestLocalMode.java    | 98 +++++++++++++++-------
 4 files changed, 97 insertions(+), 98 deletions(-)

diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 5d7aea3..adcae8a 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -71,20 +71,10 @@ public class TezUtilsInternal {
 
   public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws
       IOException {
-    FileInputStream confPBBinaryStream = null;
-    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
-    try {
-      confPBBinaryStream =
-          new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
-      confProtoBuilder.mergeFrom(confPBBinaryStream);
-    } finally {
-      if (confPBBinaryStream != null) {
-        confPBBinaryStream.close();
-      }
+    File confPBFile = new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME);
+    try (FileInputStream fis = new FileInputStream(confPBFile)) {
+      return ConfigurationProto.parseFrom(fis);
     }
-
-    ConfigurationProto confProto = confProtoBuilder.build();
-    return confProto;
   }
 
   public static void addUserSpecifiedTezConfiguration(Configuration conf,
@@ -95,31 +85,6 @@ public class TezUtilsInternal {
       }
     }
   }
-//
-//  public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
-//      IOException {
-//    FileInputStream confPBBinaryStream = null;
-//    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
-//    try {
-//      confPBBinaryStream =
-//          new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
-//      confProtoBuilder.mergeFrom(confPBBinaryStream);
-//    } finally {
-//      if (confPBBinaryStream != null) {
-//        confPBBinaryStream.close();
-//      }
-//    }
-//
-//    ConfigurationProto confProto = confProtoBuilder.build();
-//
-//    List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
-//    if (kvPairList != null && !kvPairList.isEmpty()) {
-//      for (PlanKeyValuePair kvPair : kvPairList) {
-//        conf.set(kvPair.getKey(), kvPair.getValue());
-//      }
-//    }
-//  }
-
 
   public static byte[] compressBytes(byte[] inBytes) throws IOException {
     StopWatch sw = new StopWatch().start();
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 6baea48..9006971 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -24,6 +24,7 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -50,7 +51,6 @@ import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClientHandler;
-import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.DAGAppMaster;
@@ -83,7 +83,6 @@ public class LocalClient extends FrameworkClient {
   @Override
   public void init(TezConfiguration tezConf, YarnConfiguration yarnConf) {
     this.conf = tezConf;
-    tezConf.set("fs.defaultFS", "file:///");
     // Tez libs already in the client's classpath
     this.conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
     this.conf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, localModeDAGSchedulerClassName);
@@ -286,19 +285,34 @@ public class LocalClient extends FrameworkClient {
         try {
           ApplicationId appId = appContext.getApplicationId();
 
-          // Set up working directory for DAGAppMaster
+          // Set up working directory for DAGAppMaster.
+          // The staging directory may be on the default file system, which may or may not
+          // be the local FS. For example, when using testing Hive against a pseudo-distributed
+          // cluster, it's useful for the default FS to be HDFS. Hive then puts its scratch
+          // directories on HDFS, and sets the Tez staging directory to be the session's
+          // scratch directory.
+          //
+          // To handle this case, we need to copy over the staging data back onto the
+          // local file system, where the rest of the Tez Child code expects it.
+          //
+          // NOTE: we base the local working directory path off of the staging path, even
+          // though it might be on a different file system. Typically they're both in a
+          // path starting with /tmp, but in the future we may want to use a different
+          // temp directory locally.
           Path staging = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString());
-          Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()+"_wd");
+          FileSystem stagingFs = staging.getFileSystem(conf);
+
+          FileSystem localFs = FileSystem.getLocal(conf);
+          Path userDir = localFs.makeQualified(new Path(staging.toUri().getPath() + "_wd"));
           LOG.info("Using working directory: " + userDir.toUri().getPath());
 
-          FileSystem fs = FileSystem.get(conf);
           // copy data from staging directory to working directory to simulate the resource localizing
-          FileUtil.copy(fs, staging, fs, userDir, false, conf);
+          FileUtil.copy(stagingFs, staging, localFs, userDir, false, conf);
           // Prepare Environment
           Path logDir = new Path(userDir, "localmode-log-dir");
           Path localDir = new Path(userDir, "localmode-local-dir");
-          fs.mkdirs(logDir);
-          fs.mkdirs(localDir);
+          localFs.mkdirs(logDir);
+          localFs.mkdirs(localDir);
 
           UserGroupInformation.setConfiguration(conf);
           // Add session specific credentials to the AM credentials.
@@ -357,30 +371,11 @@ public class LocalClient extends FrameworkClient {
 
     // Read in additional information about external services
     AMPluginDescriptorProto amPluginDescriptorProto =
-        getPluginDescriptorInfo(conf, applicationAttemptId.getApplicationId().toString());
-
+        TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
+            .getAmPluginDescriptor();
 
     return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
         new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs,
         versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto);
   }
-
-  private AMPluginDescriptorProto getPluginDescriptorInfo(Configuration conf,
-                                                          String applicationIdString) throws
-      IOException {
-    Path tezSysStagingPath = TezCommonUtils
-        .getTezSystemStagingPath(conf, applicationIdString);
-    // Remove the filesystem qualifier.
-    String unqualifiedPath = tezSysStagingPath.toUri().getPath();
-
-    DAGProtos.ConfigurationProto confProto =
-        TezUtilsInternal
-            .readUserSpecifiedTezConfiguration(unqualifiedPath);
-    AMPluginDescriptorProto amPluginDescriptorProto = null;
-    if (confProto.hasAmPluginDescriptor()) {
-      amPluginDescriptorProto = confProto.getAmPluginDescriptor();
-    }
-    return amPluginDescriptorProto;
-  }
-
 }
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index 6b626b1..cb52105 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -276,8 +276,7 @@ public abstract class TezExampleBase extends Configured implements Tool {
 
   protected void printExtraOptionsUsage(PrintStream ps) {
     ps.println("Tez example extra options supported are");
-    // TODO TEZ-1348 make it able to access dfs in tez local mode
-    ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, currently it can only access local file system in tez local mode,"
+    ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, "
         + " run it in distributed mode without this option");
     ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput,"
         + " enable split grouping without this option.");
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index 2a5b65f..ffc67fe 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -20,12 +20,16 @@ package org.apache.tez.test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -43,23 +47,78 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.junit.Assert.*;
 
+/**
+ * Tests for running Tez in local execution mode (without YARN).
+ */
+@RunWith(Parameterized.class)
 public class TestLocalMode {
 
   private static final File TEST_DIR = new File(
       System.getProperty("test.build.data",
           System.getProperty("java.io.tmpdir")), "TestLocalMode-tez-localmode");
 
+  private static MiniDFSCluster dfsCluster;
+  private static FileSystem remoteFs;
+
+  private final boolean useDfs;
+
+  @Parameterized.Parameters(name = "useDFS:{0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{{false}, {true}});
+  }
+
+  public TestLocalMode(boolean useDfs) {
+    this.useDfs = useDfs;
+  }
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    try {
+      Configuration conf = new Configuration();
+      dfsCluster =
+          new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true)
+              .racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+  }
+
+  @AfterClass
+  public static void afterClass() throws InterruptedException {
+    if (dfsCluster != null) {
+      try {
+        dfsCluster.shutdown();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private TezConfiguration createConf() {
+    TezConfiguration conf = new TezConfiguration();
+    conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    if (useDfs) {
+      conf.set("fs.defaultFS", remoteFs.getUri().toString());
+    } else {
+      conf.set("fs.defaultFS", "file:///");
+    }
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    return conf;
+  }
+
   @Test(timeout = 30000)
   public void testMultipleClientsWithSession() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = new TezConfiguration();
-    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf1.set("fs.defaultFS", "file:///");
-    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf1 = createConf();
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, true);
     tezClient1.start();
 
@@ -72,11 +131,7 @@ public class TestLocalMode {
     dagClient1.close();
     tezClient1.stop();
 
-
-    TezConfiguration tezConf2 = new TezConfiguration();
-    tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf2.set("fs.defaultFS", "file:///");
-    tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf2 = createConf();
     DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
     TezClient tezClient2 = TezClient.create("commonName", tezConf2, true);
     tezClient2.start();
@@ -91,10 +146,7 @@ public class TestLocalMode {
   @Test(timeout = 10000)
   public void testMultipleClientsWithoutSession() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = new TezConfiguration();
-    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf1.set("fs.defaultFS", "file:///");
-    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf1 = createConf();
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
 
@@ -108,10 +160,7 @@ public class TestLocalMode {
     tezClient1.stop();
 
 
-    TezConfiguration tezConf2 = new TezConfiguration();
-    tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf2.set("fs.defaultFS", "file:///");
-    tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf2 = createConf();
     DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
     TezClient tezClient2 = TezClient.create("commonName", tezConf2, false);
     tezClient2.start();
@@ -126,10 +175,7 @@ public class TestLocalMode {
   @Test(timeout = 20000)
   public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = new TezConfiguration();
-    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf1.set("fs.defaultFS", "file:///");
-    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf1 = createConf();
     // Run in non-session mode so that the AM terminates
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
@@ -150,10 +196,7 @@ public class TestLocalMode {
   @Test(timeout = 20000)
   public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = new TezConfiguration();
-    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf1.set("fs.defaultFS", "file:///");
-    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf1 = createConf();
     // Run in non-session mode so that the AM terminates
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
@@ -211,10 +254,7 @@ public class TestLocalMode {
     String[] outputPaths =  new String[dags];
     DAGClient[] dagClients = new DAGClient[dags];
 
-    TezConfiguration tezConf = new TezConfiguration();
-    tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf.set("fs.defaultFS", "file:///");
-    tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf = createConf();
     TezClient tezClient = TezClient.create("testMultiDAGOnSession", tezConf, true);
     tezClient.start();