You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2019/05/06 17:52:39 UTC
[tez] 01/02: Revert "TEZ-1348. Allow Tez local mode to run against
filesystems other than"
This is an automated email from the ASF dual-hosted git repository.
sseth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
commit 4bbd3c2e7600752cfe0d074d61e00e12ea0ee748
Author: Siddharth Seth <ss...@apache.org>
AuthorDate: Mon May 6 10:51:43 2019 -0700
Revert "TEZ-1348. Allow Tez local mode to run against filesystems other than"
This reverts commit 46b4004d97dd2f2cde491a93abcdd48c9b82f68e.
---
.../org/apache/tez/common/TezUtilsInternal.java | 41 +++++++++-
.../java/org/apache/tez/client/LocalClient.java | 53 ++++++------
.../org/apache/tez/examples/TezExampleBase.java | 3 +-
.../java/org/apache/tez/test/TestLocalMode.java | 93 +++++++---------------
4 files changed, 97 insertions(+), 93 deletions(-)
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index adcae8a..5d7aea3 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -71,10 +71,20 @@ public class TezUtilsInternal {
public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws
IOException {
- File confPBFile = new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME);
- try (FileInputStream fis = new FileInputStream(confPBFile)) {
- return ConfigurationProto.parseFrom(fis);
+ FileInputStream confPBBinaryStream = null;
+ ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+ try {
+ confPBBinaryStream =
+ new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
+ confProtoBuilder.mergeFrom(confPBBinaryStream);
+ } finally {
+ if (confPBBinaryStream != null) {
+ confPBBinaryStream.close();
+ }
}
+
+ ConfigurationProto confProto = confProtoBuilder.build();
+ return confProto;
}
public static void addUserSpecifiedTezConfiguration(Configuration conf,
@@ -85,6 +95,31 @@ public class TezUtilsInternal {
}
}
}
+//
+// public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
+// IOException {
+// FileInputStream confPBBinaryStream = null;
+// ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+// try {
+// confPBBinaryStream =
+// new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
+// confProtoBuilder.mergeFrom(confPBBinaryStream);
+// } finally {
+// if (confPBBinaryStream != null) {
+// confPBBinaryStream.close();
+// }
+// }
+//
+// ConfigurationProto confProto = confProtoBuilder.build();
+//
+// List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
+// if (kvPairList != null && !kvPairList.isEmpty()) {
+// for (PlanKeyValuePair kvPair : kvPairList) {
+// conf.set(kvPair.getKey(), kvPair.getValue());
+// }
+// }
+// }
+
public static byte[] compressBytes(byte[] inBytes) throws IOException {
StopWatch sw = new StopWatch().start();
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 140ada1..6baea48 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -24,8 +24,6 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
-import javax.annotation.Nullable;
-
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -85,6 +83,7 @@ public class LocalClient extends FrameworkClient {
@Override
public void init(TezConfiguration tezConf, YarnConfiguration yarnConf) {
this.conf = tezConf;
+ tezConf.set("fs.defaultFS", "file:///");
// Tez libs already in the client's classpath
this.conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
this.conf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, localModeDAGSchedulerClassName);
@@ -287,34 +286,19 @@ public class LocalClient extends FrameworkClient {
try {
ApplicationId appId = appContext.getApplicationId();
- // Set up working directory for DAGAppMaster.
- // The staging directory may be on the default file system, which may or may not
- // be the local FS. For example, when using testing Hive against a pseudo-distributed
- // cluster, it's useful for the default FS to be HDFS. Hive then puts its scratch
- // directories on HDFS, and sets the Tez staging directory to be the session's
- // scratch directory.
- //
- // To handle this case, we need to copy over the staging data back onto the
- // local file system, where the rest of the Tez Child code expects it.
- //
- // NOTE: we base the local working directory path off of the staging path, even
- // though it might be on a different file system. Typically they're both in a
- // path starting with /tmp, but in the future we may want to use a different
- // temp directory locally.
+ // Set up working directory for DAGAppMaster
Path staging = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString());
- FileSystem stagingFs = staging.getFileSystem(conf);
-
- FileSystem localFs = FileSystem.getLocal(conf);
- Path userDir = localFs.makeQualified(new Path(staging.toUri().getPath() + "_wd"));
+ Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()+"_wd");
LOG.info("Using working directory: " + userDir.toUri().getPath());
+ FileSystem fs = FileSystem.get(conf);
// copy data from staging directory to working directory to simulate the resource localizing
- FileUtil.copy(stagingFs, staging, localFs, userDir, false, conf);
+ FileUtil.copy(fs, staging, fs, userDir, false, conf);
// Prepare Environment
Path logDir = new Path(userDir, "localmode-log-dir");
Path localDir = new Path(userDir, "localmode-local-dir");
- localFs.mkdirs(logDir);
- localFs.mkdirs(localDir);
+ fs.mkdirs(logDir);
+ fs.mkdirs(localDir);
UserGroupInformation.setConfiguration(conf);
// Add session specific credentials to the AM credentials.
@@ -373,11 +357,30 @@ public class LocalClient extends FrameworkClient {
// Read in additional information about external services
AMPluginDescriptorProto amPluginDescriptorProto =
- TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
- .getAmPluginDescriptor();
+ getPluginDescriptorInfo(conf, applicationAttemptId.getApplicationId().toString());
+
return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs,
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto);
}
+
+ private AMPluginDescriptorProto getPluginDescriptorInfo(Configuration conf,
+ String applicationIdString) throws
+ IOException {
+ Path tezSysStagingPath = TezCommonUtils
+ .getTezSystemStagingPath(conf, applicationIdString);
+ // Remove the filesystem qualifier.
+ String unqualifiedPath = tezSysStagingPath.toUri().getPath();
+
+ DAGProtos.ConfigurationProto confProto =
+ TezUtilsInternal
+ .readUserSpecifiedTezConfiguration(unqualifiedPath);
+ AMPluginDescriptorProto amPluginDescriptorProto = null;
+ if (confProto.hasAmPluginDescriptor()) {
+ amPluginDescriptorProto = confProto.getAmPluginDescriptor();
+ }
+ return amPluginDescriptorProto;
+ }
+
}
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index cb52105..6b626b1 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -276,7 +276,8 @@ public abstract class TezExampleBase extends Configured implements Tool {
protected void printExtraOptionsUsage(PrintStream ps) {
ps.println("Tez example extra options supported are");
- ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, "
+ // TODO TEZ-1348 make it able to access dfs in tez local mode
+ ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, currently it can only access local file system in tez local mode,"
+ " run it in distributed mode without this option");
ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput,"
+ " enable split grouping without this option.");
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index 318349c..2a5b65f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -20,16 +20,12 @@ package org.apache.tez.test;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -47,73 +43,23 @@ import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.processor.SleepProcessor;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import static org.junit.Assert.*;
-@RunWith(Parameterized.class)
public class TestLocalMode {
private static final File TEST_DIR = new File(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")), "TestLocalMode-tez-localmode");
- private static MiniDFSCluster dfsCluster;
- private static FileSystem remoteFs;
-
- @Parameterized.Parameter
- public boolean useDfs;
-
- @Parameterized.Parameters(name = "useDFS:{0}")
- public static Collection<Object[]> params() {
- return Arrays.asList(new Object[][]{{ false }, { true }});
- }
-
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- try {
- Configuration conf = new Configuration();
- dfsCluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true)
- .racks(null).build();
- remoteFs = dfsCluster.getFileSystem();
- } catch (IOException io) {
- throw new RuntimeException("problem starting mini dfs cluster", io);
- }
- }
-
- @AfterClass
- public static void afterClass() throws InterruptedException {
- if (dfsCluster != null) {
- try {
- dfsCluster.shutdown();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- private TezConfiguration createConf() {
- TezConfiguration conf = new TezConfiguration();
- conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
- if (useDfs) {
- conf.set("fs.defaultFS", remoteFs.getUri().toString());
- } else {
- conf.set("fs.defaultFS", "file:///");
- }
- conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
- return conf;
- }
-
@Test(timeout = 30000)
public void testMultipleClientsWithSession() throws TezException, InterruptedException,
IOException {
- TezConfiguration tezConf1 = createConf();
+ TezConfiguration tezConf1 = new TezConfiguration();
+ tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ tezConf1.set("fs.defaultFS", "file:///");
+ tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
TezClient tezClient1 = TezClient.create("commonName", tezConf1, true);
tezClient1.start();
@@ -126,7 +72,11 @@ public class TestLocalMode {
dagClient1.close();
tezClient1.stop();
- TezConfiguration tezConf2 = createConf();
+
+ TezConfiguration tezConf2 = new TezConfiguration();
+ tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ tezConf2.set("fs.defaultFS", "file:///");
+ tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
TezClient tezClient2 = TezClient.create("commonName", tezConf2, true);
tezClient2.start();
@@ -141,7 +91,10 @@ public class TestLocalMode {
@Test(timeout = 10000)
public void testMultipleClientsWithoutSession() throws TezException, InterruptedException,
IOException {
- TezConfiguration tezConf1 = createConf();
+ TezConfiguration tezConf1 = new TezConfiguration();
+ tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ tezConf1.set("fs.defaultFS", "file:///");
+ tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
tezClient1.start();
@@ -155,7 +108,10 @@ public class TestLocalMode {
tezClient1.stop();
- TezConfiguration tezConf2 = createConf();
+ TezConfiguration tezConf2 = new TezConfiguration();
+ tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ tezConf2.set("fs.defaultFS", "file:///");
+ tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
TezClient tezClient2 = TezClient.create("commonName", tezConf2, false);
tezClient2.start();
@@ -170,7 +126,10 @@ public class TestLocalMode {
@Test(timeout = 20000)
public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException,
IOException {
- TezConfiguration tezConf1 = createConf();
+ TezConfiguration tezConf1 = new TezConfiguration();
+ tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ tezConf1.set("fs.defaultFS", "file:///");
+ tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
// Run in non-session mode so that the AM terminates
TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
tezClient1.start();
@@ -191,7 +150,10 @@ public class TestLocalMode {
@Test(timeout = 20000)
public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException,
IOException {
- TezConfiguration tezConf1 = createConf();
+ TezConfiguration tezConf1 = new TezConfiguration();
+ tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ tezConf1.set("fs.defaultFS", "file:///");
+ tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
// Run in non-session mode so that the AM terminates
TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
tezClient1.start();
@@ -249,7 +211,10 @@ public class TestLocalMode {
String[] outputPaths = new String[dags];
DAGClient[] dagClients = new DAGClient[dags];
- TezConfiguration tezConf = createConf();
+ TezConfiguration tezConf = new TezConfiguration();
+ tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ tezConf.set("fs.defaultFS", "file:///");
+ tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
TezClient tezClient = TezClient.create("testMultiDAGOnSession", tezConf, true);
tezClient.start();