You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/05/05 22:40:56 UTC

git commit: TEZ-1097. Tez assumes that the scratch directory has to be same as the default filesystem. (hitesh)

Repository: incubator-tez
Updated Branches:
  refs/heads/master d90dbd8dc -> cb9eb515b


TEZ-1097. Tez assumes that the scratch directory has to be same as the default filesystem. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/cb9eb515
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/cb9eb515
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/cb9eb515

Branch: refs/heads/master
Commit: cb9eb515bde88b1f145efc497fc264172d7e32da
Parents: d90dbd8
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon May 5 13:40:10 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon May 5 13:40:10 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/client/AMConfiguration.java  |  5 ++-
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 10 +++--
 .../mapreduce/examples/OrderedWordCount.java    |  7 +++-
 .../java/org/apache/tez/test/TestTezJobs.java   | 44 ++++++++++++++++++++
 4 files changed, 58 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb9eb515/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
index 2016ffc..9972e8c 100644
--- a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -77,8 +77,9 @@ public class AMConfiguration {
           + ", property=" + TezConfiguration.TEZ_AM_STAGING_DIR);
     }
     try {
-      FileSystem fs = FileSystem.get(amConf);
-      this.stagingDir = fs.resolvePath(new Path(stagingDirStr));
+      Path p = new Path(stagingDirStr);
+      FileSystem fs = p.getFileSystem(amConf);
+      this.stagingDir = fs.resolvePath(p);
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb9eb515/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 2df3de1..e16064a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -378,14 +378,16 @@ public class DAGAppMaster extends AbstractService {
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
 
-    recoveryDataDir = FileSystem.get(conf).makeQualified(new Path(
+    Path recoveryPath = new Path(
         conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
             TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT),
-              this.appAttemptID.getApplicationId().toString() +
-                  File.separator + TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME));
+        this.appAttemptID.getApplicationId().toString() +
+            Path.SEPARATOR + TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME);
+
+    recoveryFS = recoveryPath.getFileSystem(conf);
+    recoveryDataDir = recoveryFS.makeQualified(recoveryPath);
     currentRecoveryDataDir = new Path(recoveryDataDir,
         Integer.toString(this.appAttemptID.getAttemptId()));
-    recoveryFS = FileSystem.get(recoveryDataDir.toUri(), conf);
 
     if (isSession) {
       FileInputStream sessionResourcesStream = null;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb9eb515/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 78c2a37..18a6327 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -352,8 +352,11 @@ public class OrderedWordCount extends Configured implements Tool {
         + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
         + Path.SEPARATOR + appId.toString();
     Path stagingDir = new Path(stagingDirStr);
+    FileSystem pathFs = stagingDir.getFileSystem(tezConf);
+    pathFs.mkdirs(new Path(stagingDirStr));
+
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
-    stagingDir = fs.makeQualified(stagingDir);
+    stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
     
     TokenCache.obtainTokensForNamenodes(instance.credentials, new Path[] {stagingDir}, conf);
     TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
@@ -519,7 +522,7 @@ public class OrderedWordCount extends Configured implements Tool {
       throw e;
     } finally {
       if (!retainStagingDir) {
-        fs.delete(stagingDir, true);
+        pathFs.delete(stagingDir, true);
       }
       if (useTezSession) {
         LOG.info("Shutting down session");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb9eb515/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 173855a..94cdfdc 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -271,4 +271,48 @@ public class TestTezJobs {
     inStream.close();
     assertEquals(0, expectedResult.size());
   }
+
+  @Test
+  public void testNonDefaultFSStagingDir() throws Exception {
+    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+    DAG dag = new DAG("TezSleepProcessor");
+    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
+        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+        Resource.newInstance(1024, 1));
+    dag.addVertex(vertex);
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    Path stagingDir = new Path(TEST_ROOT_DIR, "testNonDefaultFSStagingDir"
+        + String.valueOf(random.nextInt(100000)));
+    FileSystem localFs = FileSystem.getLocal(tezConf);
+    stagingDir = localFs.makeQualified(stagingDir);
+    localFs.mkdirs(stagingDir);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+
+    TezClient tezClient = new TezClient(tezConf);
+    AMConfiguration amConf = new AMConfiguration(new HashMap<String, String>(),
+        new HashMap<String, LocalResource>(), tezConf, null);
+
+    DAGClient dagClient = tezClient.submitDAGApplication(dag, amConf);
+
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+          + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+
+    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    assertNotNull(dagStatus.getDAGCounters());
+    assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
+    assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
+    ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
+
+
+  }
+
+
 }