You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2019/05/06 17:52:39 UTC

[tez] 01/02: Revert "TEZ-1348. Allow Tez local mode to run against filesystems other than"

This is an automated email from the ASF dual-hosted git repository.

sseth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git

commit 4bbd3c2e7600752cfe0d074d61e00e12ea0ee748
Author: Siddharth Seth <ss...@apache.org>
AuthorDate: Mon May 6 10:51:43 2019 -0700

    Revert "TEZ-1348. Allow Tez local mode to run against filesystems other than"
    
    This reverts commit 46b4004d97dd2f2cde491a93abcdd48c9b82f68e.
---
 .../org/apache/tez/common/TezUtilsInternal.java    | 41 +++++++++-
 .../java/org/apache/tez/client/LocalClient.java    | 53 ++++++------
 .../org/apache/tez/examples/TezExampleBase.java    |  3 +-
 .../java/org/apache/tez/test/TestLocalMode.java    | 93 +++++++---------------
 4 files changed, 97 insertions(+), 93 deletions(-)

diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index adcae8a..5d7aea3 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -71,10 +71,20 @@ public class TezUtilsInternal {
 
   public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws
       IOException {
-    File confPBFile = new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME);
-    try (FileInputStream fis = new FileInputStream(confPBFile)) {
-      return ConfigurationProto.parseFrom(fis);
+    FileInputStream confPBBinaryStream = null;
+    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+    try {
+      confPBBinaryStream =
+          new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
+      confProtoBuilder.mergeFrom(confPBBinaryStream);
+    } finally {
+      if (confPBBinaryStream != null) {
+        confPBBinaryStream.close();
+      }
     }
+
+    ConfigurationProto confProto = confProtoBuilder.build();
+    return confProto;
   }
 
   public static void addUserSpecifiedTezConfiguration(Configuration conf,
@@ -85,6 +95,31 @@ public class TezUtilsInternal {
       }
     }
   }
+//
+//  public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
+//      IOException {
+//    FileInputStream confPBBinaryStream = null;
+//    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+//    try {
+//      confPBBinaryStream =
+//          new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
+//      confProtoBuilder.mergeFrom(confPBBinaryStream);
+//    } finally {
+//      if (confPBBinaryStream != null) {
+//        confPBBinaryStream.close();
+//      }
+//    }
+//
+//    ConfigurationProto confProto = confProtoBuilder.build();
+//
+//    List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
+//    if (kvPairList != null && !kvPairList.isEmpty()) {
+//      for (PlanKeyValuePair kvPair : kvPairList) {
+//        conf.set(kvPair.getKey(), kvPair.getValue());
+//      }
+//    }
+//  }
+
 
   public static byte[] compressBytes(byte[] inBytes) throws IOException {
     StopWatch sw = new StopWatch().start();
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 140ada1..6baea48 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -24,8 +24,6 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import javax.annotation.Nullable;
-
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -85,6 +83,7 @@ public class LocalClient extends FrameworkClient {
   @Override
   public void init(TezConfiguration tezConf, YarnConfiguration yarnConf) {
     this.conf = tezConf;
+    tezConf.set("fs.defaultFS", "file:///");
     // Tez libs already in the client's classpath
     this.conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
     this.conf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, localModeDAGSchedulerClassName);
@@ -287,34 +286,19 @@ public class LocalClient extends FrameworkClient {
         try {
           ApplicationId appId = appContext.getApplicationId();
 
-          // Set up working directory for DAGAppMaster.
-          // The staging directory may be on the default file system, which may or may not
-          // be the local FS. For example, when using testing Hive against a pseudo-distributed
-          // cluster, it's useful for the default FS to be HDFS. Hive then puts its scratch
-          // directories on HDFS, and sets the Tez staging directory to be the session's
-          // scratch directory.
-          //
-          // To handle this case, we need to copy over the staging data back onto the
-          // local file system, where the rest of the Tez Child code expects it.
-          //
-          // NOTE: we base the local working directory path off of the staging path, even
-          // though it might be on a different file system. Typically they're both in a
-          // path starting with /tmp, but in the future we may want to use a different
-          // temp directory locally.
+          // Set up working directory for DAGAppMaster
           Path staging = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString());
-          FileSystem stagingFs = staging.getFileSystem(conf);
-
-          FileSystem localFs = FileSystem.getLocal(conf);
-          Path userDir = localFs.makeQualified(new Path(staging.toUri().getPath() + "_wd"));
+          Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()+"_wd");
           LOG.info("Using working directory: " + userDir.toUri().getPath());
 
+          FileSystem fs = FileSystem.get(conf);
           // copy data from staging directory to working directory to simulate the resource localizing
-          FileUtil.copy(stagingFs, staging, localFs, userDir, false, conf);
+          FileUtil.copy(fs, staging, fs, userDir, false, conf);
           // Prepare Environment
           Path logDir = new Path(userDir, "localmode-log-dir");
           Path localDir = new Path(userDir, "localmode-local-dir");
-          localFs.mkdirs(logDir);
-          localFs.mkdirs(localDir);
+          fs.mkdirs(logDir);
+          fs.mkdirs(localDir);
 
           UserGroupInformation.setConfiguration(conf);
           // Add session specific credentials to the AM credentials.
@@ -373,11 +357,30 @@ public class LocalClient extends FrameworkClient {
 
     // Read in additional information about external services
     AMPluginDescriptorProto amPluginDescriptorProto =
-        TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
-            .getAmPluginDescriptor();
+        getPluginDescriptorInfo(conf, applicationAttemptId.getApplicationId().toString());
+
 
     return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
         new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs,
         versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto);
   }
+
+  private AMPluginDescriptorProto getPluginDescriptorInfo(Configuration conf,
+                                                          String applicationIdString) throws
+      IOException {
+    Path tezSysStagingPath = TezCommonUtils
+        .getTezSystemStagingPath(conf, applicationIdString);
+    // Remove the filesystem qualifier.
+    String unqualifiedPath = tezSysStagingPath.toUri().getPath();
+
+    DAGProtos.ConfigurationProto confProto =
+        TezUtilsInternal
+            .readUserSpecifiedTezConfiguration(unqualifiedPath);
+    AMPluginDescriptorProto amPluginDescriptorProto = null;
+    if (confProto.hasAmPluginDescriptor()) {
+      amPluginDescriptorProto = confProto.getAmPluginDescriptor();
+    }
+    return amPluginDescriptorProto;
+  }
+
 }
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index cb52105..6b626b1 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -276,7 +276,8 @@ public abstract class TezExampleBase extends Configured implements Tool {
 
   protected void printExtraOptionsUsage(PrintStream ps) {
     ps.println("Tez example extra options supported are");
-    ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, "
+    // TODO TEZ-1348 make it able to access dfs in tez local mode
+    ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, currently it can only access local file system in tez local mode,"
         + " run it in distributed mode without this option");
     ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput,"
         + " enable split grouping without this option.");
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index 318349c..2a5b65f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -20,16 +20,12 @@ package org.apache.tez.test;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -47,73 +43,23 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import static org.junit.Assert.*;
 
-@RunWith(Parameterized.class)
 public class TestLocalMode {
 
   private static final File TEST_DIR = new File(
       System.getProperty("test.build.data",
           System.getProperty("java.io.tmpdir")), "TestLocalMode-tez-localmode");
 
-  private static MiniDFSCluster dfsCluster;
-  private static FileSystem remoteFs;
-
-  @Parameterized.Parameter
-  public boolean useDfs;
-
-  @Parameterized.Parameters(name = "useDFS:{0}")
-  public static Collection<Object[]> params() {
-    return Arrays.asList(new Object[][]{{ false }, { true }});
-  }
-
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    try {
-      Configuration conf = new Configuration();
-      dfsCluster =
-          new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true)
-              .racks(null).build();
-      remoteFs = dfsCluster.getFileSystem();
-    } catch (IOException io) {
-      throw new RuntimeException("problem starting mini dfs cluster", io);
-    }
-  }
-
-  @AfterClass
-  public static void afterClass() throws InterruptedException {
-    if (dfsCluster != null) {
-      try {
-        dfsCluster.shutdown();
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  private TezConfiguration createConf() {
-    TezConfiguration conf = new TezConfiguration();
-    conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-    if (useDfs) {
-      conf.set("fs.defaultFS", remoteFs.getUri().toString());
-    } else {
-      conf.set("fs.defaultFS", "file:///");
-    }
-    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
-    return conf;
-  }
-
   @Test(timeout = 30000)
   public void testMultipleClientsWithSession() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = createConf();
+    TezConfiguration tezConf1 = new TezConfiguration();
+    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf1.set("fs.defaultFS", "file:///");
+    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, true);
     tezClient1.start();
 
@@ -126,7 +72,11 @@ public class TestLocalMode {
     dagClient1.close();
     tezClient1.stop();
 
-    TezConfiguration tezConf2 = createConf();
+
+    TezConfiguration tezConf2 = new TezConfiguration();
+    tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf2.set("fs.defaultFS", "file:///");
+    tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
     TezClient tezClient2 = TezClient.create("commonName", tezConf2, true);
     tezClient2.start();
@@ -141,7 +91,10 @@ public class TestLocalMode {
   @Test(timeout = 10000)
   public void testMultipleClientsWithoutSession() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = createConf();
+    TezConfiguration tezConf1 = new TezConfiguration();
+    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf1.set("fs.defaultFS", "file:///");
+    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
 
@@ -155,7 +108,10 @@ public class TestLocalMode {
     tezClient1.stop();
 
 
-    TezConfiguration tezConf2 = createConf();
+    TezConfiguration tezConf2 = new TezConfiguration();
+    tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf2.set("fs.defaultFS", "file:///");
+    tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
     TezClient tezClient2 = TezClient.create("commonName", tezConf2, false);
     tezClient2.start();
@@ -170,7 +126,10 @@ public class TestLocalMode {
   @Test(timeout = 20000)
   public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = createConf();
+    TezConfiguration tezConf1 = new TezConfiguration();
+    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf1.set("fs.defaultFS", "file:///");
+    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     // Run in non-session mode so that the AM terminates
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
@@ -191,7 +150,10 @@ public class TestLocalMode {
   @Test(timeout = 20000)
   public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException,
       IOException {
-    TezConfiguration tezConf1 = createConf();
+    TezConfiguration tezConf1 = new TezConfiguration();
+    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf1.set("fs.defaultFS", "file:///");
+    tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     // Run in non-session mode so that the AM terminates
     TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
     tezClient1.start();
@@ -249,7 +211,10 @@ public class TestLocalMode {
     String[] outputPaths =  new String[dags];
     DAGClient[] dagClients = new DAGClient[dags];
 
-    TezConfiguration tezConf = createConf();
+    TezConfiguration tezConf = new TezConfiguration();
+    tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf.set("fs.defaultFS", "file:///");
+    tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     TezClient tezClient = TezClient.create("testMultiDAGOnSession", tezConf, true);
     tezClient.start();