You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/11 00:17:35 UTC
git commit: TEZ-676. Tez job fails on client side if nodemanager
running AM is lost. (Tsuyoshi Ozawa and hitesh via hitesh)
Repository: incubator-tez
Updated Branches:
refs/heads/master 98b49f227 -> 2118e9644
TEZ-676. Tez job fails on client side if nodemanager running AM is lost. (Tsuyoshi Ozawa and hitesh via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2118e964
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2118e964
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2118e964
Branch: refs/heads/master
Commit: 2118e96446bd3ca8858c831a5cf7ebc07d683007
Parents: 98b49f2
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon Mar 10 16:16:17 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon Mar 10 16:16:17 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/dag/app/DAGAppMaster.java | 40 +++++++++++++++----
.../app/dag/event/DAGAppMasterEventType.java | 1 +
.../apache/tez/dag/app/rm/TaskScheduler.java | 42 +++++++++++++-------
.../dag/app/rm/TaskSchedulerEventHandler.java | 8 +++-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 2 +
5 files changed, 70 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2118e964/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index c8185b8..ae2bc64 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -331,9 +331,13 @@ public class DAGAppMaster extends AbstractService {
dispatcher.register(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher());
- taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
+ this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher);
addIfService(taskSchedulerEventHandler, true);
+ if (isLastAMRetry) {
+ this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+ }
+
dispatcher.register(AMSchedulerEventType.class,
taskSchedulerEventHandler);
addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
@@ -420,6 +424,7 @@ public class DAGAppMaster extends AbstractService {
sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.INTERNAL_ERROR));
} else {
LOG.info("Internal Error. Finishing directly as no dag is active.");
+ this.taskSchedulerEventHandler.setShouldUnregisterFlag();
shutdownHandler.shutdown();
}
break;
@@ -427,6 +432,7 @@ public class DAGAppMaster extends AbstractService {
DAGAppMasterEventDAGFinished finishEvt =
(DAGAppMasterEventDAGFinished) event;
if (!isSession) {
+ this.taskSchedulerEventHandler.setShouldUnregisterFlag();
_updateLoggers(currentDAG, "_post");
setStateOnDAGCompletion();
LOG.info("Shutting down on completion of dag:" +
@@ -463,22 +469,29 @@ public class DAGAppMaster extends AbstractService {
+ finishEvt.getDAGState()
+ ". Error. Shutting down.");
state = DAGAppMasterState.ERROR;
+ this.taskSchedulerEventHandler.setShouldUnregisterFlag();
shutdownHandler.shutdown();
break;
}
if (!state.equals(DAGAppMasterState.ERROR)) {
if (!sessionStopped.get()) {
LOG.info("Waiting for next DAG to be submitted.");
- taskSchedulerEventHandler.dagCompleted();
+ this.taskSchedulerEventHandler.dagCompleted();
state = DAGAppMasterState.IDLE;
} else {
LOG.info("Session shutting down now.");
+ this.taskSchedulerEventHandler.setShouldUnregisterFlag();
state = DAGAppMasterState.SUCCEEDED;
shutdownHandler.shutdown();
}
}
}
break;
+ case AM_REBOOT:
+ LOG.info("Received an AM_REBOOT signal");
+ this.state = DAGAppMasterState.KILLED;
+ shutdownHandler.shutdown(true);
+ break;
default:
throw new TezUncheckedException(
"AppMaster: No handler for event type: " + event.getType());
@@ -510,6 +523,10 @@ public class DAGAppMaster extends AbstractService {
private AtomicBoolean shutdownHandled = new AtomicBoolean(false);
public void shutdown() {
+ shutdown(false);
+ }
+
+ public void shutdown(boolean now) {
if(!shutdownHandled.compareAndSet(false, true)) {
LOG.info("Ignoring multiple shutdown events");
return;
@@ -517,20 +534,28 @@ public class DAGAppMaster extends AbstractService {
LOG.info("Handling DAGAppMaster shutdown");
- AMShutdownRunnable r = new AMShutdownRunnable();
+ AMShutdownRunnable r = new AMShutdownRunnable(now);
Thread t = new Thread(r, "AMShutdownThread");
t.start();
}
private class AMShutdownRunnable implements Runnable {
+ private final boolean immediateShutdown;
+
+ public AMShutdownRunnable(boolean immediateShutdown) {
+ this.immediateShutdown = immediateShutdown;
+ }
+
@Override
public void run() {
// TODO:currently just wait for some time so clients can know the
// final states. Will be removed once RM come on. TEZ-160.
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
+ if (!immediateShutdown) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
try {
@@ -837,6 +862,7 @@ public class DAGAppMaster extends AbstractService {
synchronized void shutdownTezAM() {
sessionStopped.set(true);
+ this.taskSchedulerEventHandler.setShouldUnregisterFlag();
if (currentDAG != null
&& !currentDAG.isComplete()) {
//send a DAG_KILL message
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2118e964/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
index ea22abf..26c96a0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
@@ -20,5 +20,6 @@ package org.apache.tez.dag.app.dag.event;
public enum DAGAppMasterEventType {
INTERNAL_ERROR,
+ AM_REBOOT,
DAG_FINISHED
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2118e964/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 36c90ca..bb06904 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -159,7 +159,7 @@ public class TaskScheduler extends AbstractService
final String appTrackingUrl;
final AppContext appContext;
- boolean isStopped = false;
+ AtomicBoolean isStopped = new AtomicBoolean(false);
private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
@@ -168,7 +168,10 @@ public class TaskScheduler extends AbstractService
DelayedContainerManager delayedContainerManager;
long localitySchedulingDelay;
long sessionDelay;
-
+
+ @VisibleForTesting
+ protected AtomicBoolean shouldUnregister = new AtomicBoolean(false);
+
class CRCookie {
// Do not use these variables directly. Can caused mocked unit tests to fail.
private Object task;
@@ -270,7 +273,11 @@ public class TaskScheduler extends AbstractService
return new TaskSchedulerAppCallbackWrapper(realAppClient,
appCallbackExecutor);
}
-
+
+ public void setShouldUnregister() {
+ this.shouldUnregister.set(true);
+ }
+
// AbstractService methods
@Override
public synchronized void serviceInit(Configuration conf) {
@@ -347,17 +354,22 @@ public class TaskScheduler extends AbstractService
@Override
public void serviceStop() throws InterruptedException {
// upcall to app outside of locks
- AppFinalStatus status = appClientDelegate.getFinalAppStatus();
try {
delayedContainerManager.shutdown();
// Wait for contianers to be released.
delayedContainerManager.join(2000l);
- // TODO TEZ-36 dont unregister automatically after reboot sent by RM
synchronized (this) {
- isStopped = true;
- amRmClient.unregisterApplicationMaster(status.exitStatus,
- status.exitMessage,
- status.postCompletionTrackingUrl);
+ isStopped.set(true);
+ if (shouldUnregister.get()) {
+ AppFinalStatus status = appClientDelegate.getFinalAppStatus();
+ LOG.info("Unregistering application from RM"
+ + ", exitStatus=" + status.exitStatus
+ + ", exitMessage=" + status.exitMessage
+ + ", trackingURL=" + status.postCompletionTrackingUrl);
+ amRmClient.unregisterApplicationMaster(status.exitStatus,
+ status.exitMessage,
+ status.postCompletionTrackingUrl);
+ }
}
// call client.stop() without lock client will attempt to stop the callback
@@ -378,7 +390,7 @@ public class TaskScheduler extends AbstractService
// AMRMClientAsync interface methods
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
- if(isStopped) {
+ if (isStopped.get()) {
return;
}
Map<Object, ContainerStatus> appContainerStatus =
@@ -435,7 +447,7 @@ public class TaskScheduler extends AbstractService
@Override
public void onContainersAllocated(List<Container> containers) {
- if (isStopped) {
+ if (isStopped.get()) {
return;
}
Map<CookieContainerRequest, Container> assignedContainers;
@@ -736,7 +748,7 @@ public class TaskScheduler extends AbstractService
@Override
public void onShutdownRequest() {
- if(isStopped) {
+ if (isStopped.get()) {
return;
}
// upcall to app must be outside locks
@@ -745,7 +757,7 @@ public class TaskScheduler extends AbstractService
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {
- if(isStopped) {
+ if (isStopped.get()) {
return;
}
// ignore bad nodes for now
@@ -755,7 +767,7 @@ public class TaskScheduler extends AbstractService
@Override
public float getProgress() {
- if(isStopped) {
+ if (isStopped.get()) {
return 1;
}
@@ -776,7 +788,7 @@ public class TaskScheduler extends AbstractService
@Override
public void onError(Throwable t) {
- if(isStopped) {
+ if (isStopped.get()) {
return;
}
appClientDelegate.onError(t);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2118e964/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index a597ee1..ebc4111 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -458,7 +458,7 @@ public class TaskSchedulerEventHandler extends AbstractService
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
LOG.info("App shutdown requested by scheduler");
- sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.INTERNAL_ERROR));
+ sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT));
}
@Override
@@ -550,4 +550,10 @@ public class TaskSchedulerEventHandler extends AbstractService
sendEvent(new AMContainerEventCompleted(ContainerStatus.newInstance(
containerId, ContainerState.COMPLETE, "Container Preempted Internally", -1), true));
}
+
+ public void setShouldUnregisterFlag() {
+ this.taskScheduler.setShouldUnregister();
+ LOG.info("TaskScheduler notified that it should unregister from RM");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2118e964/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index f7601a6..087da83 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -195,6 +195,7 @@ class TestTaskSchedulerHelpers {
AppContext appContext) {
super(appClient, containerSignatureMatcher, appHostName, appHostPort,
appTrackingUrl, appContext);
+ shouldUnregister.set(true);
}
public TaskSchedulerWithDrainableAppCallback(
@@ -205,6 +206,7 @@ class TestTaskSchedulerHelpers {
AppContext appContext) {
super(appClient, containerSignatureMatcher, appHostName, appHostPort,
appTrackingUrl, client, appContext);
+ shouldUnregister.set(true);
}
@Override