You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/12/16 03:04:05 UTC

tez git commit: TEZ-2976. Recovery fails when InputDescriptor is changed during input initialization (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master 42ef63665 -> 685fa742f


TEZ-2976. Recovery fails when InputDescriptor is changed during input initialization (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/685fa742
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/685fa742
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/685fa742

Branch: refs/heads/master
Commit: 685fa742f7042876d8a87544b1e08ce566837e65
Parents: 42ef636
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Dec 16 10:02:13 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Dec 16 10:03:40 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  2 +
 .../apache/tez/examples/HashJoinExample.java    |  6 ++-
 .../org/apache/tez/examples/JoinValidate.java   |  6 ++-
 .../apache/tez/examples/OrderedWordCount.java   |  7 ++--
 .../tez/examples/SimpleSessionExample.java      |  2 +-
 .../tez/examples/SortMergeJoinExample.java      |  6 ++-
 .../org/apache/tez/examples/TezExampleBase.java | 19 +++++++--
 .../java/org/apache/tez/examples/WordCount.java |  3 +-
 .../RecoveryServiceWithEventHandlingHook.java   |  4 ++
 .../java/org/apache/tez/test/TestLocalMode.java |  2 +-
 .../java/org/apache/tez/test/TestRecovery.java  | 44 ++++++++++++++------
 12 files changed, 75 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0236084..a9d1893 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
   TEZ-604. Revert temporary changes made in TEZ-603 to kill the provided tez session, if running a MapReduce job.
 
 ALL CHANGES:
+  TEZ-2976. Recovery fails when InputDescriptor is changed during input initialization.
   TEZ-2997. Tez UI: Support searches by CallerContext ID for DAGs
   TEZ-2996. TestAnalyzer fails in trunk after recovery redesign
   TEZ-2987. TestVertexImpl.testTez2684 fails

http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f3cfb58..4e82560 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2231,6 +2231,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       if (initGeneratedEvents != null && !initGeneratedEvents.isEmpty()) {
         eventHandler.handle(new VertexEventRouteEvent(getVertexId(), initGeneratedEvents));
       }
+      // reset rootInputDescriptor because it may be changed during input initialization.
+      this.rootInputDescriptors = recoveryData.getVertexInitedEvent().getAdditionalInputs();
     } else {
       initedTime = clock.getTime();
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
index 680de35..935ccbc 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
@@ -157,7 +157,8 @@ public class HashJoinExample extends TezExampleBase {
                 MRInput
                     .createConfigBuilder(new Configuration(tezConf),
                         TextInputFormat.class, hashPath.toUri().toString())
-                    .groupSplits(!isDisableSplitGrouping()).build());
+                    .groupSplits(!isDisableSplitGrouping())
+                    .generateSplitsInAM(!isGenerateSplitInClient()).build());
 
     /**
      * This vertex represents that side of the data that will be streamed and
@@ -173,7 +174,8 @@ public class HashJoinExample extends TezExampleBase {
                 MRInput
                     .createConfigBuilder(new Configuration(tezConf),
                         TextInputFormat.class, streamPath.toUri().toString())
-                    .groupSplits(!isDisableSplitGrouping()).build());
+                    .groupSplits(!isDisableSplitGrouping())
+                    .generateSplitsInAM(!isGenerateSplitInClient()).build());
 
     /**
      * This vertex represents the join operation. It writes the join output as

http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index 4883351..186bacd 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -155,14 +155,16 @@ public class JoinValidate extends TezExampleBase {
         ForwardingProcessor.class.getName())).addDataSource("lhs",
         MRInput
             .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
-                lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
+                lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping())
+                .generateSplitsInAM(!isGenerateSplitInClient()).build());
     setVertexExecutionContext(lhsVertex, getLhsExecutionContext());
 
     Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create(
         ForwardingProcessor.class.getName())).addDataSource("rhs",
         MRInput
             .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
-                rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
+                rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping())
+                .generateSplitsInAM(!isGenerateSplitInClient()).build());
     setVertexExecutionContext(rhsVertex, getRhsExecutionContext());
 
     Vertex joinValidateVertex = Vertex.create("joinvalidate", ProcessorDescriptor.create(

http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
index fff7c1b..6596809 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
@@ -123,10 +123,11 @@ public class OrderedWordCount extends TezExampleBase {
   }
   
   public static DAG createDAG(TezConfiguration tezConf, String inputPath, String outputPath,
-      int numPartitions, boolean disableSplitGrouping, String dagName) throws IOException {
+      int numPartitions, boolean disableSplitGrouping, boolean isGenerateSplitInClient, String dagName) throws IOException {
 
     DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf),
-        TextInputFormat.class, inputPath).groupSplits(!disableSplitGrouping).build();
+        TextInputFormat.class, inputPath).groupSplits(!disableSplitGrouping)
+          .generateSplitsInAM(!isGenerateSplitInClient).build();
 
     DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(tezConf),
         TextOutputFormat.class, outputPath).build();
@@ -198,7 +199,7 @@ public class OrderedWordCount extends TezExampleBase {
       TezClient tezClient) throws Exception {
     DAG dag = createDAG(tezConf, args[0], args[1],
         args.length == 3 ? Integer.parseInt(args[2]) : 1, isDisableSplitGrouping(),
-        "OrderedWordCount");
+        isGenerateSplitInClient(), "OrderedWordCount");
     LOG.info("Running OrderedWordCount");
     return runDag(dag, isCountersLog(), LOG);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
index 08a4b12..d555f47 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
@@ -87,7 +87,7 @@ public class SimpleSessionExample extends TezExampleBase {
 
     for (int i = 0; i < inputPaths.length; ++i) {
       DAG dag = OrderedWordCount.createDAG(tezConf, inputPaths[i], outputPaths[i], numPartitions,
-          isDisableSplitGrouping(), ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session
+          isDisableSplitGrouping(), isGenerateSplitInClient(), ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session
 
       LOG.info("Running dag number " + i);
       if(runDag(dag, isCountersLog(), LOG) != 0) {

http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
index 5a0cbd8..1054e00 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
@@ -158,7 +158,8 @@ public class SortMergeJoinExample extends TezExampleBase {
                 MRInput
                     .createConfigBuilder(new Configuration(tezConf),
                         TextInputFormat.class, inputPath1.toUri().toString())
-                    .groupSplits(!isDisableSplitGrouping()).build());
+                    .groupSplits(!isDisableSplitGrouping())
+                    .generateSplitsInAM(!isGenerateSplitInClient()).build());
 
     /**
      * The other vertex represents the other side of the join. It reads text
@@ -173,7 +174,8 @@ public class SortMergeJoinExample extends TezExampleBase {
                 MRInput
                     .createConfigBuilder(new Configuration(tezConf),
                         TextInputFormat.class, inputPath2.toUri().toString())
-                    .groupSplits(!isDisableSplitGrouping()).build());
+                    .groupSplits(!isDisableSplitGrouping())
+                    .generateSplitsInAM(!isGenerateSplitInClient()).build());
 
     /**
      * This vertex represents the join operation. It writes the join output as

http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index 281eaa9..6960559 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -58,10 +58,12 @@ public abstract class TezExampleBase extends Configured implements Tool {
   protected static final String DISABLE_SPLIT_GROUPING = "disableSplitGrouping";
   protected static final String LOCAL_MODE = "local";
   protected static final String COUNTER_LOG = "counter";
+  protected static final String GENERATE_SPLIT_IN_CLIENT = "generateSplitInClient";
 
   private boolean disableSplitGrouping = false;
   private boolean isLocalMode = false;
   private boolean isCountersLog = false;
+  private boolean generateSplitInClient = false;
 
   protected boolean isCountersLog() {
 	  return isCountersLog;
@@ -71,11 +73,16 @@ public abstract class TezExampleBase extends Configured implements Tool {
     return disableSplitGrouping;
   }
 
+  protected boolean isGenerateSplitInClient() {
+    return generateSplitInClient;
+  }
+
   private Options getExtraOptions() {
     Options options = new Options();
     options.addOption(LOCAL_MODE, false, "run it as local mode");
     options.addOption(DISABLE_SPLIT_GROUPING, false , "disable split grouping");
     options.addOption(COUNTER_LOG, false , "print counter log");
+    options.addOption(GENERATE_SPLIT_IN_CLIENT, false, "whether generate split in client");
     return options;
   }
 
@@ -91,9 +98,11 @@ public abstract class TezExampleBase extends Configured implements Tool {
       disableSplitGrouping = true;
     }
     if (optionParser.getCommandLine().hasOption(COUNTER_LOG)) {
-        isCountersLog = true;
+      isCountersLog = true;
+    }
+    if (optionParser.getCommandLine().hasOption(GENERATE_SPLIT_IN_CLIENT)) {
+      generateSplitInClient = true;
     }
-
     return _execute(otherArgs, null, null);
   }
 
@@ -124,7 +133,10 @@ public abstract class TezExampleBase extends Configured implements Tool {
       disableSplitGrouping = true;
     }
     if (optionParser.getCommandLine().hasOption(COUNTER_LOG)) {
-        isCountersLog = true;
+      isCountersLog = true;
+    }
+    if (optionParser.getCommandLine().hasOption(GENERATE_SPLIT_IN_CLIENT)) {
+      generateSplitInClient = true;
     }
     String[] otherArgs = optionParser.getRemainingArgs();
     return _execute(otherArgs, conf, tezClient);
@@ -238,6 +250,7 @@ public abstract class TezExampleBase extends Configured implements Tool {
     ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput,"
         + " enable split grouping without this option.");
     ps.println("-" + COUNTER_LOG + "\t\t to print counters information");
+    ps.println("-" + GENERATE_SPLIT_IN_CLIENT + "\t\tgenerate input split in client");
     ps.println();
     ps.println("The Tez example extra options usage syntax is ");
     ps.println("example_name [extra_options] [example_parameters]");

http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
index 3578267..6149193 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
@@ -143,7 +143,8 @@ public class WordCount extends TezExampleBase {
     // Create the descriptor that describes the input data to Tez. Using MRInput to read text 
     // data from the given input path. The TextInputFormat is used to read the text data.
     DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf),
-        TextInputFormat.class, inputPath).groupSplits(!isDisableSplitGrouping()).build();
+        TextInputFormat.class, inputPath).groupSplits(!isDisableSplitGrouping())
+          .generateSplitsInAM(!isGenerateSplitInClient()).build();
 
     // Create a descriptor that describes the output data to Tez. Using MROoutput to write text
     // data to the given output path. The TextOutputFormat is used to write the text data.

http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
index cec8fbd..8a0f39e 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
@@ -222,6 +222,10 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService {
     public SimpleShutdownCondition() {
     }
 
+    public HistoryEvent getHistoryEvent() {
+      return this.event;
+    }
+
     private String encodeHistoryEvent(HistoryEvent event) throws IOException {
       ByteArrayOutputStream out = new ByteArrayOutputStream();
       event.toProtoStream(out);

http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index 3a03739..2a5b65f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -230,7 +230,7 @@ public class TestLocalMode {
     try {
       for (int i=0; i<inputPaths.length; ++i) {
         DAG dag = OrderedWordCount.createDAG(tezConf, inputPaths[i], outputPaths[i], 1,
-            false, ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session
+            false, false, ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session
 
         tezClient.waitTillReady();
         System.out.println("Running dag number " + i);

http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
index 4f5ef1a..dc26167 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
@@ -52,6 +52,7 @@ import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
@@ -70,6 +71,7 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.examples.HashJoinExample;
 import org.apache.tez.examples.OrderedWordCount;
+import org.apache.tez.examples.TezExampleBase;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.test.RecoveryServiceWithEventHandlingHook.SimpleRecoveryEventHook;
@@ -239,13 +241,15 @@ public class TestRecovery {
       // randomly choose half of the test scenario to avoid
       // timeout.
       if (rand.nextDouble() < 0.5) {
-        testOrderedWordCount(shutdownConditions.get(i), true);
+        // generate split in client side when HistoryEvent type is VERTEX_STARTED (TEZ-2976)
+        testOrderedWordCount(shutdownConditions.get(i), true,
+            shutdownConditions.get(i).getHistoryEvent().getEventType() == HistoryEventType.VERTEX_STARTED);
       }
     }
   }
 
   private void testOrderedWordCount(SimpleShutdownCondition shutdownCondition,
-      boolean enableAutoParallelism) throws Exception {
+      boolean enableAutoParallelism, boolean generateSplitInClient) throws Exception {
     LOG.info("shutdownCondition:" + shutdownCondition.getEventType()
         + ", event=" + shutdownCondition.getEvent());
     String inputDirStr = "/tmp/owc-input/";
@@ -276,11 +280,16 @@ public class TestRecovery {
     tezConf.setBoolean(
         TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false);
     tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO;org.apache.tez=DEBUG");
-
     OrderedWordCount job = new OrderedWordCount();
-    Assert
-        .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[] {
-            inputDirStr, outputDirStr, "5" }, null) == 0);
+    if (generateSplitInClient) {
+      Assert
+          .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{
+              "-generateSplitInClient", inputDirStr, outputDirStr, "5"}, null) == 0);
+    } else {
+      Assert
+          .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{
+              inputDirStr, outputDirStr, "5"}, null) == 0);
+    }
     TestTezJobs.verifyOutput(outputDir, remoteFs);
     List<HistoryEvent> historyEventsOfAttempt1 = RecoveryParser
         .readRecoveryEvents(tezConf, job.getAppId(), 1);
@@ -392,13 +401,15 @@ public class TestRecovery {
       // randomly choose half of the test scenario to avoid
       // timeout.
       if (rand.nextDouble() < 0.5) {
-        testHashJoinExample(shutdownConditions.get(i), true);
+        // generate split in client side when HistoryEvent type is VERTEX_STARTED (TEZ-2976)
+        testHashJoinExample(shutdownConditions.get(i), true,
+            shutdownConditions.get(i).getHistoryEvent().getEventType() == HistoryEventType.VERTEX_STARTED);
       }
     }
   }
 
   private void testHashJoinExample(SimpleShutdownCondition shutdownCondition,
-      boolean enableAutoParallelism) throws Exception {
+      boolean enableAutoParallelism, boolean generateSplitInClient) throws Exception {
     HashJoinExample hashJoinExample = new HashJoinExample();
     TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
     tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
@@ -449,10 +460,19 @@ public class TestRecovery {
     out1.close();
     out2.close();
 
-    String[] args = new String[] {
-        "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "="
-            + stagingDirPath.toString(), inPath1.toString(),
-        inPath2.toString(), "1", outPath.toString() };
+    String[] args = null;
+    if (generateSplitInClient) {
+      args = new String[]{
+          "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "="
+              + stagingDirPath.toString(),
+          "-generateSplitInClient",
+          inPath1.toString(), inPath2.toString(), "1", outPath.toString()};
+    } else {
+      args = new String[]{
+          "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "="
+              + stagingDirPath.toString(),
+          inPath1.toString(), inPath2.toString(), "1", outPath.toString()};
+    }
     assertEquals(0, hashJoinExample.run(args));
 
     FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter() {