You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2019/05/06 17:52:40 UTC

[tez] 02/02: EZ-1348. Allow Tez local mode to run against filesystems other than local FS. (Todd Lipcon via sseth)

This is an automated email from the ASF dual-hosted git repository.

sseth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git

commit 82c1d75c7b3abf7316ba265a599b910fcc2360c7
Author: Siddharth Seth <ss...@apache.org>
AuthorDate: Mon May 6 10:52:19 2019 -0700

    EZ-1348. Allow Tez local mode to run against filesystems other than
    local FS. (Todd Lipcon via sseth)
---
 .../org/apache/tez/common/TezUtilsInternal.java    | 41 +--------
 .../java/org/apache/tez/client/LocalClient.java    | 53 ++++++------
 .../org/apache/tez/examples/TezExampleBase.java    |  3 +-
 .../java/org/apache/tez/test/TestLocalMode.java    | 98 +++++++++++++++-------
 4 files changed, 97 insertions(+), 98 deletions(-)

diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 5d7aea3..adcae8a 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -71,20 +71,10 @@ public class TezUtilsInternal {
 
   public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws
       IOException {
-    FileInputStream confPBBinaryStream = null;
-    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
-    try {
-      confPBBinaryStream =
-          new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
-      confProtoBuilder.mergeFrom(confPBBinaryStream);
-    } finally {
-      if (confPBBinaryStream != null) {
-        confPBBinaryStream.close();
-      }
+    File confPBFile = new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME);
+    try (FileInputStream fis = new FileInputStream(confPBFile)) {
+      return ConfigurationProto.parseFrom(fis);
     }
-
-    ConfigurationProto confProto = confProtoBuilder.build();
-    return confProto;
   }
 
   public static void addUserSpecifiedTezConfiguration(Configuration conf,
@@ -95,31 +85,6 @@ public class TezUtilsInternal {
       }
     }
   }
-//
-//  public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
-//      IOException {
-//    FileInputStream confPBBinaryStream = null;
-//    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
-//    try {
-//      confPBBinaryStream =
-//          new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
-//      confProtoBuilder.mergeFrom(confPBBinaryStream);
-//    } finally {
-//      if (confPBBinaryStream != null) {
-//        confPBBinaryStream.close();
-//      }
-//    }
-//
-//    ConfigurationProto confProto = confProtoBuilder.build();
-//
-//    List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
-//    if (kvPairList != null && !kvPairList.isEmpty()) {
-//      for (PlanKeyValuePair kvPair : kvPairList) {
-//        conf.set(kvPair.getKey(), kvPair.getValue());
-//      }
-//    }
-//  }
-
 
   public static byte[] compressBytes(byte[] inBytes) throws IOException {
     StopWatch sw = new StopWatch().start();
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 6baea48..9006971 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -24,6 +24,7 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -50,7 +51,6 @@ import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClientHandler;
-import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.DAGAppMaster;
@@ -83,7 +83,6 @@ public class LocalClient extends FrameworkClient {
   @Override
   public void init(TezConfiguration tezConf, YarnConfiguration yarnConf) {
     this.conf = tezConf;
-    tezConf.set("fs.defaultFS", "file:///");
     // Tez libs already in the client's classpath
     this.conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
     this.conf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, localModeDAGSchedulerClassName);
@@ -286,19 +285,34 @@ public class LocalClient extends FrameworkClient {
         try {
           ApplicationId appId = appContext.getApplicationId();
 
-          // Set up working directory for DAGAppMaster
+          // Set up working directory for DAGAppMaster.
+          // The staging directory may be on the default file system, which may or may not
+          // be the local FS. For example, when using testing Hive against a pseudo-distributed
+          // cluster, it's useful for the default FS to be HDFS. Hive then puts its scratch
+          // directories on HDFS, and sets the Tez staging directory to be the session's
+          // scratch directory.
+          //
+          // To handle this case, we need to copy over the staging data back onto the
+          // local file system, where the rest of the Tez Child code expects it.
+          //
+          // NOTE: we base the local working directory path off of the staging path, even
+          // though it might be on a different file system. Typically they're both in a
+          // path starting with /tmp, but in the future we may want to use a different
+          // temp directory locally.
           Path staging = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString());
-          Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()+"_wd");
+          FileSystem stagingFs = staging.getFileSystem(conf);
+
+          FileSystem localFs = FileSystem.getLocal(conf);
+          Path userDir = localFs.makeQualified(new Path(staging.toUri().getPath() + "_wd"));
           LOG.info("Using working directory: " + userDir.toUri().getPath());
 
-          FileSystem fs = FileSystem.get(conf);
           // copy data from staging directory to working directory to simulate the resource localizing
-          FileUtil.copy(fs, staging, fs, userDir, false, conf);
+          FileUtil.copy(stagingFs, staging, localFs, userDir, false, conf);
           // Prepare Environment
           Path logDir = new Path(userDir, "localmode-log-dir");
           Path localDir = new Path(userDir, "localmode-local-dir");
-          fs.mkdirs(logDir);
-          fs.mkdirs(localDir);
+          localFs.mkdirs(logDir);
+          localFs.mkdirs(localDir);
 
           UserGroupInformation.setConfiguration(conf);
           // Add session specific credentials to the AM credentials.
@@ -357,30 +371,11 @@ public class LocalClient extends FrameworkClient {
 
     // Read in additional information about external services
     AMPluginDescriptorProto amPluginDescriptorProto =
-        getPluginDescriptorInfo(conf, applicationAttemptId.getApplicationId().toString());
-
+        TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
+            .getAmPluginDescriptor();
 
     return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
         new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs,
         versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto);
   }
-
-  private AMPluginDescriptorProto getPluginDescriptorInfo(Configuration conf,
-                                                          String applicationIdString) throws
-      IOException {
-    Path tezSysStagingPath = TezCommonUtils
-        .getTezSystemStagingPath(conf, applicationIdString);
-    // Remove the filesystem qualifier.
-    String unqualifiedPath = tezSysStagingPath.toUri().getPath();
-
-    DAGProtos.ConfigurationProto confProto =
-        TezUtilsInternal
-            .readUserSpecifiedTezConfiguration(unqualifiedPath);
-    AMPluginDescriptorProto amPluginDescriptorProto = null;
-    if (confProto.hasAmPluginDescriptor()) {
-      amPluginDescriptorProto = confProto.getAmPluginDescriptor();
-    }
-    return amPluginDescriptorProto;
-  }
-
 }
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index 6b626b1..cb52105 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -276,8 +276,7 @@ public abstract class TezExampleBase extends Configured implements Tool {
 
   protected void printExtraOptionsUsage(PrintStream ps) {
     ps.println("Tez example extra options supported are");
-    // TODO TEZ-1348 make it able to access dfs in tez local mode
-    ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, currently it can only access local file system in tez local mode,"
+    ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, "
         + " run it in distributed mode without this option");
     ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput,"
         + " enable split grouping without this option.");
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index 2a5b65f..ffc67fe 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -20,12 +20,16 @@ package org.apache.tez.test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -43,23 +47,78 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.junit.Assert.*;
 
+/**
+ * Tests for running Tez in local execution mode (without YARN).
+ */
+@RunWith(Parameterized.class)
 public class TestLocalMode {
 
   private static final File TEST_DIR = new File(
       System.getProperty("test.build.data",
           System.getProperty("java.io.tmpdir")), "TestLocalMode-tez-localmode");
 
+  private static MiniDFSCluster dfsCluster;
+  private static FileSystem remoteFs;
+
+  private final boolean useDfs;
+
+  @Parameterized.Parameters(name = "useDFS:{0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{{false}, {true}});
+  }
+
+  public TestLocalMode(boolean useDfs) {
+    this.useDfs = useDfs;
+  }
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    try {
+      Configuration conf = new Configuration();
+      dfsCluster =
+          new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true)
+              .racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+  }
+
+  @AfterClass
+  public static void afterClass() throws InterruptedException {
+    if (dfsCluster != null) {
+      try {
+        dfsCluster.shutdown();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private TezConfiguration createConf() {
+    TezConfiguration conf = new TezConfiguration();
+    conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    if (useDfs) {
+      conf.set("fs.defaultFS", remoteFs.getUri().toString());
+    } else {
+      conf.set("fs.defaultFS", "file:///");
+    }
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    return conf;
+  }
+
   @Test(timeout = 30000)
   public void testMultipleClientsWithSession() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = new TezConfiguration();
-    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf1.set("fs.defaultFS", "file:///");
-    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf1 = createConf();
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, true);
     tezClient1.start();
 
@@ -72,11 +131,7 @@ public class TestLocalMode {
     dagClient1.close();
     tezClient1.stop();
 
-
-    TezConfiguration tezConf2 = new TezConfiguration();
-    tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf2.set("fs.defaultFS", "file:///");
-    tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf2 = createConf();
     DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
     TezClient tezClient2 = TezClient.create("commonName", tezConf2, true);
     tezClient2.start();
@@ -91,10 +146,7 @@ public class TestLocalMode {
   @Test(timeout = 10000)
   public void testMultipleClientsWithoutSession() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = new TezConfiguration();
-    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf1.set("fs.defaultFS", "file:///");
-    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf1 = createConf();
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
 
@@ -108,10 +160,7 @@ public class TestLocalMode {
     tezClient1.stop();
 
 
-    TezConfiguration tezConf2 = new TezConfiguration();
-    tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf2.set("fs.defaultFS", "file:///");
-    tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf2 = createConf();
     DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
     TezClient tezClient2 = TezClient.create("commonName", tezConf2, false);
     tezClient2.start();
@@ -126,10 +175,7 @@ public class TestLocalMode {
   @Test(timeout = 20000)
   public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = new TezConfiguration();
-    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf1.set("fs.defaultFS", "file:///");
-    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf1 = createConf();
     // Run in non-session mode so that the AM terminates
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
@@ -150,10 +196,7 @@ public class TestLocalMode {
   @Test(timeout = 20000)
   public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = new TezConfiguration();
-    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf1.set("fs.defaultFS", "file:///");
-    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf1 = createConf();
     // Run in non-session mode so that the AM terminates
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
@@ -211,10 +254,7 @@ public class TestLocalMode {
     String[] outputPaths =  new String[dags];
     DAGClient[] dagClients = new DAGClient[dags];
 
-    TezConfiguration tezConf = new TezConfiguration();
-    tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    tezConf.set("fs.defaultFS", "file:///");
-    tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    TezConfiguration tezConf = createConf();
     TezClient tezClient = TezClient.create("testMultiDAGOnSession", tezConf, true);
     tezClient.start();