You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/08/15 22:57:18 UTC

[2/2] git commit: TEZ-1429. Avoid sysexit in the DAGAM in case of local mode. (sseth)

TEZ-1429. Avoid sysexit in the DAGAM in case of local mode. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cbc385de
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cbc385de
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cbc385de

Branch: refs/heads/master
Commit: cbc385de28681c7ad8902825a7aa8f1a6f790e51
Parents: ebd14e5
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Aug 15 13:56:23 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 15 13:56:23 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/api/TezConstants.java    |   2 +
 .../java/org/apache/tez/client/LocalClient.java |  26 ++++-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   8 +-
 .../org/apache/tez/client/TestLocalMode.java    | 101 ++++++++++++++++---
 4 files changed, 120 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cbc385de/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index 537013a..ba634af 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -101,4 +101,6 @@ public class TezConstants {
    * such as killing the DAG.
    */
   public static final String TEZ_DAG_MODIFY_ACLS = TezConfiguration.TEZ_AM_PREFIX + "dag.modify-acls";
+
+  public static final long TEZ_DAG_SLEEP_TIME_BEFORE_EXIT = 5000;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/cbc385de/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 7939582..0b615fa 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -139,6 +140,8 @@ public class LocalClient extends FrameworkClient {
     report.setRpcPort(dagAppMaster.getRpcPort());
     report.setClientToAMToken(null);
     report.setYarnApplicationState(convertDAGAppMasterState(dagAppMaster.getState()));
+    report.setFinalApplicationStatus(convertDAGAppMasterStateToFinalYARNState(dagAppMaster.getState()));
+
 
     List<String> diagnostics = dagAppMaster.getDiagnostics();
     if (diagnostics != null) {
@@ -146,7 +149,6 @@ public class LocalClient extends FrameworkClient {
     }
     report.setTrackingUrl("N/A");
     report.setFinishTime(0);
-    report.setFinalApplicationStatus(null);
     report.setApplicationResourceUsageReport(null);
     report.setOriginalTrackingUrl("N/A");
     report.setProgress(dagAppMaster.getProgress());
@@ -155,6 +157,28 @@ public class LocalClient extends FrameworkClient {
     return report;
   }
 
+  protected FinalApplicationStatus convertDAGAppMasterStateToFinalYARNState(
+      DAGAppMasterState dagAppMasterState) {
+    switch (dagAppMasterState) {
+      case NEW:
+      case INITED:
+      case RECOVERING:
+      case IDLE:
+      case RUNNING:
+        return FinalApplicationStatus.UNDEFINED;
+      case SUCCEEDED:
+        return FinalApplicationStatus.SUCCEEDED;
+      case FAILED:
+        return FinalApplicationStatus.FAILED;
+      case KILLED:
+        return FinalApplicationStatus.KILLED;
+      case ERROR:
+        return FinalApplicationStatus.FAILED;
+      default:
+        return FinalApplicationStatus.UNDEFINED;
+    }
+  }
+
   protected YarnApplicationState convertDAGAppMasterState(DAGAppMasterState dagAppMasterState) {
     switch (dagAppMasterState) {
     case NEW:

http://git-wip-us.apache.org/repos/asf/tez/blob/cbc385de/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 3e65aed..be9e28d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -304,7 +304,7 @@ public class DAGAppMaster extends AbstractService {
        UserGroupInformation.setConfiguration(conf);
        appMasterUgi = UserGroupInformation.getCurrentUser();
     }
-    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, !isLocal);
     String strAppId = this.appAttemptID.getApplicationId().toString();
     this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
 
@@ -447,7 +447,9 @@ public class DAGAppMaster extends AbstractService {
    * Exit call. Just in a function call to enable testing.
    */
   protected void sysexit() {
-    System.exit(0);
+    if (!isLocal) {
+      System.exit(0);
+    }
   }
 
   private synchronized void handle(DAGAppMasterEvent event) {
@@ -610,7 +612,7 @@ public class DAGAppMaster extends AbstractService {
         if (!immediateShutdown) {
           try {
             LOG.info("Sleeping for 5 seconds before shutting down");
-            Thread.sleep(5000);
+            Thread.sleep(TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT);
           } catch (InterruptedException e) {
             e.printStackTrace();
           }

http://git-wip-us.apache.org/repos/asf/tez/blob/cbc385de/tez-dag/src/test/java/org/apache/tez/client/TestLocalMode.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/client/TestLocalMode.java b/tez-dag/src/test/java/org/apache/tez/client/TestLocalMode.java
index d6a2de7..f0d84e6 100644
--- a/tez-dag/src/test/java/org/apache/tez/client/TestLocalMode.java
+++ b/tez-dag/src/test/java/org/apache/tez/client/TestLocalMode.java
@@ -20,28 +20,31 @@ package org.apache.tez.client;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 
-import junit.framework.Assert;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
 import org.junit.Test;
 
 public class TestLocalMode {
 
-  @Test
+  @Test(timeout = 10000)
   public void testMultipleClientsWithSession() throws TezException, InterruptedException,
       IOException {
     TezConfiguration tezConf1 = new TezConfiguration();
@@ -50,7 +53,7 @@ public class TestLocalMode {
     TezClient tezClient1 = new TezClient("commonName", tezConf1, true);
     tezClient1.start();
 
-    DAG dag1 = createSimpleSleepDAG("dag1");
+    DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName());
 
     DAGClient dagClient1 = tezClient1.submitDAG(dag1);
     dagClient1.waitForCompletion();
@@ -63,7 +66,7 @@ public class TestLocalMode {
     TezConfiguration tezConf2 = new TezConfiguration();
     tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
     tezConf2.set("fs.defaultFS", "file:///");
-    DAG dag2 = createSimpleSleepDAG("dag2");
+    DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
     TezClient tezClient2 = new TezClient("commonName", tezConf2, true);
     tezClient2.start();
     DAGClient dagClient2 = tezClient2.submitDAG(dag2);
@@ -74,7 +77,7 @@ public class TestLocalMode {
     tezClient2.stop();
   }
 
-  @Test
+  @Test(timeout = 10000)
   public void testMultipleClientsWithoutSession() throws TezException, InterruptedException,
       IOException {
     TezConfiguration tezConf1 = new TezConfiguration();
@@ -83,7 +86,7 @@ public class TestLocalMode {
     TezClient tezClient1 = new TezClient("commonName", tezConf1, false);
     tezClient1.start();
 
-    DAG dag1 = createSimpleSleepDAG("dag1");
+    DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName());
 
     DAGClient dagClient1 = tezClient1.submitDAG(dag1);
     dagClient1.waitForCompletion();
@@ -96,7 +99,7 @@ public class TestLocalMode {
     TezConfiguration tezConf2 = new TezConfiguration();
     tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
     tezConf2.set("fs.defaultFS", "file:///");
-    DAG dag2 = createSimpleSleepDAG("dag2");
+    DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
     TezClient tezClient2 = new TezClient("commonName", tezConf2, false);
     tezClient2.start();
     DAGClient dagClient2 = tezClient2.submitDAG(dag2);
@@ -107,9 +110,81 @@ public class TestLocalMode {
     tezClient2.stop();
   }
 
-  private DAG createSimpleSleepDAG(String dagName) {
+  @Test(timeout = 20000)
+  public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException,
+      IOException {
+    TezConfiguration tezConf1 = new TezConfiguration();
+    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf1.set("fs.defaultFS", "file:///");
+    // Run in non-session mode so that the AM terminates
+    TezClient tezClient1 = new TezClient("commonName", tezConf1, false);
+    tezClient1.start();
+
+    DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName());
+
+    DAGClient dagClient1 = tezClient1.submitDAG(dag1);
+    dagClient1.waitForCompletion();
+    assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
+
+    // Sleep for more time than is required for the DAG to complete.
+    Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));
+
+    dagClient1.close();
+    tezClient1.stop();
+  }
+
+  @Test(timeout = 20000)
+  public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException,
+      IOException {
+    TezConfiguration tezConf1 = new TezConfiguration();
+    tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf1.set("fs.defaultFS", "file:///");
+    // Run in non-session mode so that the AM terminates
+    TezClient tezClient1 = new TezClient("commonName", tezConf1, false);
+    tezClient1.start();
+
+    DAG dag1 = createSimpleDAG("dag1", FailingProcessor.class.getName());
+
+    DAGClient dagClient1 = tezClient1.submitDAG(dag1);
+    dagClient1.waitForCompletion();
+    assertEquals(DAGStatus.State.FAILED, dagClient1.getDAGStatus(null).getState());
+
+    // Sleep for more time than is required for the DAG to complete.
+    Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));
+
+    dagClient1.close();
+    tezClient1.stop();
+  }
+
+  public static class FailingProcessor extends AbstractLogicalIOProcessor {
+
+    public FailingProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() throws Exception {
+    }
+
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+    }
+
+    @Override
+    public void close() throws Exception {
+    }
+
+    @Override
+    public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws
+        Exception {
+      throw new TezException("FailingProcessor");
+
+    }
+  }
+
+  private DAG createSimpleDAG(String dagName, String processorName) {
     DAG dag = new DAG(dagName).addVertex(new Vertex("Sleep", new ProcessorDescriptor(
-        SleepProcessor.class.getName()).setUserPayload(
+        processorName).setUserPayload(
         new UserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload())), 1));
     return dag;