You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/08/15 22:57:18 UTC
[2/2] git commit: TEZ-1429. Avoid sysexit in the DAGAM in case of
local mode. (sseth)
TEZ-1429. Avoid sysexit in the DAGAM in case of local mode. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cbc385de
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cbc385de
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cbc385de
Branch: refs/heads/master
Commit: cbc385de28681c7ad8902825a7aa8f1a6f790e51
Parents: ebd14e5
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Aug 15 13:56:23 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 15 13:56:23 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/dag/api/TezConstants.java | 2 +
.../java/org/apache/tez/client/LocalClient.java | 26 ++++-
.../org/apache/tez/dag/app/DAGAppMaster.java | 8 +-
.../org/apache/tez/client/TestLocalMode.java | 101 ++++++++++++++++---
4 files changed, 120 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/cbc385de/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index 537013a..ba634af 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -101,4 +101,6 @@ public class TezConstants {
* such as killing the DAG.
*/
public static final String TEZ_DAG_MODIFY_ACLS = TezConfiguration.TEZ_AM_PREFIX + "dag.modify-acls";
+
+ public static final long TEZ_DAG_SLEEP_TIME_BEFORE_EXIT = 5000;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cbc385de/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 7939582..0b615fa 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -139,6 +140,8 @@ public class LocalClient extends FrameworkClient {
report.setRpcPort(dagAppMaster.getRpcPort());
report.setClientToAMToken(null);
report.setYarnApplicationState(convertDAGAppMasterState(dagAppMaster.getState()));
+ report.setFinalApplicationStatus(convertDAGAppMasterStateToFinalYARNState(dagAppMaster.getState()));
+
List<String> diagnostics = dagAppMaster.getDiagnostics();
if (diagnostics != null) {
@@ -146,7 +149,6 @@ public class LocalClient extends FrameworkClient {
}
report.setTrackingUrl("N/A");
report.setFinishTime(0);
- report.setFinalApplicationStatus(null);
report.setApplicationResourceUsageReport(null);
report.setOriginalTrackingUrl("N/A");
report.setProgress(dagAppMaster.getProgress());
@@ -155,6 +157,28 @@ public class LocalClient extends FrameworkClient {
return report;
}
+ protected FinalApplicationStatus convertDAGAppMasterStateToFinalYARNState(
+ DAGAppMasterState dagAppMasterState) {
+ switch (dagAppMasterState) {
+ case NEW:
+ case INITED:
+ case RECOVERING:
+ case IDLE:
+ case RUNNING:
+ return FinalApplicationStatus.UNDEFINED;
+ case SUCCEEDED:
+ return FinalApplicationStatus.SUCCEEDED;
+ case FAILED:
+ return FinalApplicationStatus.FAILED;
+ case KILLED:
+ return FinalApplicationStatus.KILLED;
+ case ERROR:
+ return FinalApplicationStatus.FAILED;
+ default:
+ return FinalApplicationStatus.UNDEFINED;
+ }
+ }
+
protected YarnApplicationState convertDAGAppMasterState(DAGAppMasterState dagAppMasterState) {
switch (dagAppMasterState) {
case NEW:
http://git-wip-us.apache.org/repos/asf/tez/blob/cbc385de/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 3e65aed..be9e28d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -304,7 +304,7 @@ public class DAGAppMaster extends AbstractService {
UserGroupInformation.setConfiguration(conf);
appMasterUgi = UserGroupInformation.getCurrentUser();
}
- conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+ conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, !isLocal);
String strAppId = this.appAttemptID.getApplicationId().toString();
this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
@@ -447,7 +447,9 @@ public class DAGAppMaster extends AbstractService {
* Exit call. Just in a function call to enable testing.
*/
protected void sysexit() {
- System.exit(0);
+ if (!isLocal) {
+ System.exit(0);
+ }
}
private synchronized void handle(DAGAppMasterEvent event) {
@@ -610,7 +612,7 @@ public class DAGAppMaster extends AbstractService {
if (!immediateShutdown) {
try {
LOG.info("Sleeping for 5 seconds before shutting down");
- Thread.sleep(5000);
+ Thread.sleep(TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT);
} catch (InterruptedException e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cbc385de/tez-dag/src/test/java/org/apache/tez/client/TestLocalMode.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/client/TestLocalMode.java b/tez-dag/src/test/java/org/apache/tez/client/TestLocalMode.java
index d6a2de7..f0d84e6 100644
--- a/tez-dag/src/test/java/org/apache/tez/client/TestLocalMode.java
+++ b/tez-dag/src/test/java/org/apache/tez/client/TestLocalMode.java
@@ -20,28 +20,31 @@ package org.apache.tez.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
-import junit.framework.Assert;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.junit.Test;
public class TestLocalMode {
- @Test
+ @Test(timeout = 10000)
public void testMultipleClientsWithSession() throws TezException, InterruptedException,
IOException {
TezConfiguration tezConf1 = new TezConfiguration();
@@ -50,7 +53,7 @@ public class TestLocalMode {
TezClient tezClient1 = new TezClient("commonName", tezConf1, true);
tezClient1.start();
- DAG dag1 = createSimpleSleepDAG("dag1");
+ DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName());
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
@@ -63,7 +66,7 @@ public class TestLocalMode {
TezConfiguration tezConf2 = new TezConfiguration();
tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
tezConf2.set("fs.defaultFS", "file:///");
- DAG dag2 = createSimpleSleepDAG("dag2");
+ DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
TezClient tezClient2 = new TezClient("commonName", tezConf2, true);
tezClient2.start();
DAGClient dagClient2 = tezClient2.submitDAG(dag2);
@@ -74,7 +77,7 @@ public class TestLocalMode {
tezClient2.stop();
}
- @Test
+ @Test(timeout = 10000)
public void testMultipleClientsWithoutSession() throws TezException, InterruptedException,
IOException {
TezConfiguration tezConf1 = new TezConfiguration();
@@ -83,7 +86,7 @@ public class TestLocalMode {
TezClient tezClient1 = new TezClient("commonName", tezConf1, false);
tezClient1.start();
- DAG dag1 = createSimpleSleepDAG("dag1");
+ DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName());
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
@@ -96,7 +99,7 @@ public class TestLocalMode {
TezConfiguration tezConf2 = new TezConfiguration();
tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
tezConf2.set("fs.defaultFS", "file:///");
- DAG dag2 = createSimpleSleepDAG("dag2");
+ DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
TezClient tezClient2 = new TezClient("commonName", tezConf2, false);
tezClient2.start();
DAGClient dagClient2 = tezClient2.submitDAG(dag2);
@@ -107,9 +110,81 @@ public class TestLocalMode {
tezClient2.stop();
}
- private DAG createSimpleSleepDAG(String dagName) {
+ @Test(timeout = 20000)
+ public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException,
+ IOException {
+ TezConfiguration tezConf1 = new TezConfiguration();
+ tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ tezConf1.set("fs.defaultFS", "file:///");
+ // Run in non-session mode so that the AM terminates
+ TezClient tezClient1 = new TezClient("commonName", tezConf1, false);
+ tezClient1.start();
+
+ DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName());
+
+ DAGClient dagClient1 = tezClient1.submitDAG(dag1);
+ dagClient1.waitForCompletion();
+ assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
+
+ // Sleep for more time than is required for the DAG to complete.
+ Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));
+
+ dagClient1.close();
+ tezClient1.stop();
+ }
+
+ @Test(timeout = 20000)
+ public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException,
+ IOException {
+ TezConfiguration tezConf1 = new TezConfiguration();
+ tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ tezConf1.set("fs.defaultFS", "file:///");
+ // Run in non-session mode so that the AM terminates
+ TezClient tezClient1 = new TezClient("commonName", tezConf1, false);
+ tezClient1.start();
+
+ DAG dag1 = createSimpleDAG("dag1", FailingProcessor.class.getName());
+
+ DAGClient dagClient1 = tezClient1.submitDAG(dag1);
+ dagClient1.waitForCompletion();
+ assertEquals(DAGStatus.State.FAILED, dagClient1.getDAGStatus(null).getState());
+
+ // Sleep for more time than is required for the DAG to complete.
+ Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));
+
+ dagClient1.close();
+ tezClient1.stop();
+ }
+
+ public static class FailingProcessor extends AbstractLogicalIOProcessor {
+
+ public FailingProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ }
+
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+
+ @Override
+ public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws
+ Exception {
+ throw new TezException("FailingProcessor");
+
+ }
+ }
+
+ private DAG createSimpleDAG(String dagName, String processorName) {
DAG dag = new DAG(dagName).addVertex(new Vertex("Sleep", new ProcessorDescriptor(
- SleepProcessor.class.getName()).setUserPayload(
+ processorName).setUserPayload(
new UserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload())), 1));
return dag;