You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/05/05 22:40:56 UTC
git commit: TEZ-1097. Tez assumes that the scratch directory has to
be same as the default filesystem. (hitesh)
Repository: incubator-tez
Updated Branches:
refs/heads/master d90dbd8dc -> cb9eb515b
TEZ-1097. Tez assumes that the scratch directory has to be same as the default filesystem. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/cb9eb515
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/cb9eb515
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/cb9eb515
Branch: refs/heads/master
Commit: cb9eb515bde88b1f145efc497fc264172d7e32da
Parents: d90dbd8
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon May 5 13:40:10 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon May 5 13:40:10 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/client/AMConfiguration.java | 5 ++-
.../org/apache/tez/dag/app/DAGAppMaster.java | 10 +++--
.../mapreduce/examples/OrderedWordCount.java | 7 +++-
.../java/org/apache/tez/test/TestTezJobs.java | 44 ++++++++++++++++++++
4 files changed, 58 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb9eb515/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
index 2016ffc..9972e8c 100644
--- a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -77,8 +77,9 @@ public class AMConfiguration {
+ ", property=" + TezConfiguration.TEZ_AM_STAGING_DIR);
}
try {
- FileSystem fs = FileSystem.get(amConf);
- this.stagingDir = fs.resolvePath(new Path(stagingDirStr));
+ Path p = new Path(stagingDirStr);
+ FileSystem fs = p.getFileSystem(amConf);
+ this.stagingDir = fs.resolvePath(p);
} catch (IOException e) {
throw new TezUncheckedException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb9eb515/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 2df3de1..e16064a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -378,14 +378,16 @@ public class DAGAppMaster extends AbstractService {
TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
- recoveryDataDir = FileSystem.get(conf).makeQualified(new Path(
+ Path recoveryPath = new Path(
conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT),
- this.appAttemptID.getApplicationId().toString() +
- File.separator + TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME));
+ this.appAttemptID.getApplicationId().toString() +
+ Path.SEPARATOR + TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME);
+
+ recoveryFS = recoveryPath.getFileSystem(conf);
+ recoveryDataDir = recoveryFS.makeQualified(recoveryPath);
currentRecoveryDataDir = new Path(recoveryDataDir,
Integer.toString(this.appAttemptID.getAttemptId()));
- recoveryFS = FileSystem.get(recoveryDataDir.toUri(), conf);
if (isSession) {
FileInputStream sessionResourcesStream = null;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb9eb515/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 78c2a37..18a6327 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -352,8 +352,11 @@ public class OrderedWordCount extends Configured implements Tool {
+ user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
+ Path.SEPARATOR + appId.toString();
Path stagingDir = new Path(stagingDirStr);
+ FileSystem pathFs = stagingDir.getFileSystem(tezConf);
+ pathFs.mkdirs(new Path(stagingDirStr));
+
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
- stagingDir = fs.makeQualified(stagingDir);
+ stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
TokenCache.obtainTokensForNamenodes(instance.credentials, new Path[] {stagingDir}, conf);
TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
@@ -519,7 +522,7 @@ public class OrderedWordCount extends Configured implements Tool {
throw e;
} finally {
if (!retainStagingDir) {
- fs.delete(stagingDir, true);
+ pathFs.delete(stagingDir, true);
}
if (useTezSession) {
LOG.info("Shutting down session");
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb9eb515/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 173855a..94cdfdc 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -271,4 +271,48 @@ public class TestTezJobs {
inStream.close();
assertEquals(0, expectedResult.size());
}
+
+ @Test
+ public void testNonDefaultFSStagingDir() throws Exception {
+ SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+ DAG dag = new DAG("TezSleepProcessor");
+ Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
+ SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+ Resource.newInstance(1024, 1));
+ dag.addVertex(vertex);
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ Path stagingDir = new Path(TEST_ROOT_DIR, "testNonDefaultFSStagingDir"
+ + String.valueOf(random.nextInt(100000)));
+ FileSystem localFs = FileSystem.getLocal(tezConf);
+ stagingDir = localFs.makeQualified(stagingDir);
+ localFs.mkdirs(stagingDir);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+
+ TezClient tezClient = new TezClient(tezConf);
+ AMConfiguration amConf = new AMConfiguration(new HashMap<String, String>(),
+ new HashMap<String, LocalResource>(), tezConf, null);
+
+ DAGClient dagClient = tezClient.submitDAGApplication(dag, amConf);
+
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+ + dagStatus.getState());
+ Thread.sleep(500l);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+ dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+
+ assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+ assertNotNull(dagStatus.getDAGCounters());
+ assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
+ assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
+ ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
+
+
+ }
+
+
}