You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/11 00:17:35 UTC

git commit: TEZ-676. Tez job fails on client side if nodemanager running AM is lost. (Tsuyoshi Ozawa and hitesh via hitesh)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 98b49f227 -> 2118e9644


TEZ-676. Tez job fails on client side if nodemanager running AM is lost. (Tsuyoshi Ozawa and hitesh via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2118e964
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2118e964
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2118e964

Branch: refs/heads/master
Commit: 2118e96446bd3ca8858c831a5cf7ebc07d683007
Parents: 98b49f2
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon Mar 10 16:16:17 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon Mar 10 16:16:17 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 40 +++++++++++++++----
 .../app/dag/event/DAGAppMasterEventType.java    |  1 +
 .../apache/tez/dag/app/rm/TaskScheduler.java    | 42 +++++++++++++-------
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  8 +++-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |  2 +
 5 files changed, 70 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2118e964/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index c8185b8..ae2bc64 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -331,9 +331,13 @@ public class DAGAppMaster extends AbstractService {
     dispatcher.register(TaskAttemptEventType.class,
         new TaskAttemptEventDispatcher());
 
-    taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
+    this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
         clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher);
     addIfService(taskSchedulerEventHandler, true);
+    if (isLastAMRetry) {
+      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+    }
+
     dispatcher.register(AMSchedulerEventType.class,
         taskSchedulerEventHandler);
     addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
@@ -420,6 +424,7 @@ public class DAGAppMaster extends AbstractService {
         sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.INTERNAL_ERROR));
       } else {
         LOG.info("Internal Error. Finishing directly as no dag is active.");
+        this.taskSchedulerEventHandler.setShouldUnregisterFlag();
         shutdownHandler.shutdown();
       }
       break;
@@ -427,6 +432,7 @@ public class DAGAppMaster extends AbstractService {
       DAGAppMasterEventDAGFinished finishEvt =
           (DAGAppMasterEventDAGFinished) event;
       if (!isSession) {
+        this.taskSchedulerEventHandler.setShouldUnregisterFlag();
         _updateLoggers(currentDAG, "_post");
         setStateOnDAGCompletion();
         LOG.info("Shutting down on completion of dag:" +
@@ -463,22 +469,29 @@ public class DAGAppMaster extends AbstractService {
               + finishEvt.getDAGState()
               + ". Error. Shutting down.");
           state = DAGAppMasterState.ERROR;
+          this.taskSchedulerEventHandler.setShouldUnregisterFlag();
           shutdownHandler.shutdown();
           break;
         }
         if (!state.equals(DAGAppMasterState.ERROR)) {
           if (!sessionStopped.get()) {
             LOG.info("Waiting for next DAG to be submitted.");
-            taskSchedulerEventHandler.dagCompleted();
+            this.taskSchedulerEventHandler.dagCompleted();
             state = DAGAppMasterState.IDLE;
           } else {
             LOG.info("Session shutting down now.");
+            this.taskSchedulerEventHandler.setShouldUnregisterFlag();
             state = DAGAppMasterState.SUCCEEDED;
             shutdownHandler.shutdown();
           }
         }
       }
       break;
+    case AM_REBOOT:
+      LOG.info("Received an AM_REBOOT signal");
+      this.state = DAGAppMasterState.KILLED;
+      shutdownHandler.shutdown(true);
+      break;
     default:
       throw new TezUncheckedException(
           "AppMaster: No handler for event type: " + event.getType());
@@ -510,6 +523,10 @@ public class DAGAppMaster extends AbstractService {
     private AtomicBoolean shutdownHandled = new AtomicBoolean(false);
 
     public void shutdown() {
+      shutdown(false);
+    }
+
+    public void shutdown(boolean now) {
       if(!shutdownHandled.compareAndSet(false, true)) {
         LOG.info("Ignoring multiple shutdown events");
         return;
@@ -517,20 +534,28 @@ public class DAGAppMaster extends AbstractService {
 
       LOG.info("Handling DAGAppMaster shutdown");
 
-      AMShutdownRunnable r = new AMShutdownRunnable();
+      AMShutdownRunnable r = new AMShutdownRunnable(now);
       Thread t = new Thread(r, "AMShutdownThread");
       t.start();
     }
 
     private class AMShutdownRunnable implements Runnable {
+      private final boolean immediateShutdown;
+
+      public AMShutdownRunnable(boolean immediateShutdown) {
+        this.immediateShutdown = immediateShutdown;
+      }
+
       @Override
       public void run() {
         // TODO:currently just wait for some time so clients can know the
         // final states. Will be removed once RM come on. TEZ-160.
-        try {
-          Thread.sleep(5000);
-        } catch (InterruptedException e) {
-          e.printStackTrace();
+        if (!immediateShutdown) {
+          try {
+            Thread.sleep(5000);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
         }
 
         try {
@@ -837,6 +862,7 @@ public class DAGAppMaster extends AbstractService {
 
   synchronized void shutdownTezAM() {
     sessionStopped.set(true);
+    this.taskSchedulerEventHandler.setShouldUnregisterFlag();
     if (currentDAG != null
         && !currentDAG.isComplete()) {
       //send a DAG_KILL message

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2118e964/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
index ea22abf..26c96a0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
@@ -20,5 +20,6 @@ package org.apache.tez.dag.app.dag.event;
 
 public enum DAGAppMasterEventType {
   INTERNAL_ERROR,
+  AM_REBOOT,
   DAG_FINISHED
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2118e964/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 36c90ca..bb06904 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -159,7 +159,7 @@ public class TaskScheduler extends AbstractService
   final String appTrackingUrl;
   final AppContext appContext;
 
-  boolean isStopped = false;
+  AtomicBoolean isStopped = new AtomicBoolean(false);
 
   private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
   private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
@@ -168,7 +168,10 @@ public class TaskScheduler extends AbstractService
   DelayedContainerManager delayedContainerManager;
   long localitySchedulingDelay;
   long sessionDelay;
-  
+
+  @VisibleForTesting
+  protected AtomicBoolean shouldUnregister = new AtomicBoolean(false);
+
   class CRCookie {
     // Do not use these variables directly. Can caused mocked unit tests to fail.
     private Object task;
@@ -270,7 +273,11 @@ public class TaskScheduler extends AbstractService
     return new TaskSchedulerAppCallbackWrapper(realAppClient,
         appCallbackExecutor);
   }
-  
+
+  public void setShouldUnregister() {
+    this.shouldUnregister.set(true);
+  }
+
   // AbstractService methods
   @Override
   public synchronized void serviceInit(Configuration conf) {
@@ -347,17 +354,22 @@ public class TaskScheduler extends AbstractService
   @Override
   public void serviceStop() throws InterruptedException {
     // upcall to app outside of locks
-    AppFinalStatus status = appClientDelegate.getFinalAppStatus();
     try {
       delayedContainerManager.shutdown();
       // Wait for contianers to be released.
       delayedContainerManager.join(2000l);
-      // TODO TEZ-36 dont unregister automatically after reboot sent by RM
       synchronized (this) {
-        isStopped = true;
-        amRmClient.unregisterApplicationMaster(status.exitStatus,
-                                               status.exitMessage,
-                                               status.postCompletionTrackingUrl);
+        isStopped.set(true);
+        if (shouldUnregister.get()) {
+          AppFinalStatus status = appClientDelegate.getFinalAppStatus();
+          LOG.info("Unregistering application from RM"
+              + ", exitStatus=" + status.exitStatus
+              + ", exitMessage=" + status.exitMessage
+              + ", trackingURL=" + status.postCompletionTrackingUrl);
+          amRmClient.unregisterApplicationMaster(status.exitStatus,
+              status.exitMessage,
+              status.postCompletionTrackingUrl);
+        }
       }
 
       // call client.stop() without lock client will attempt to stop the callback
@@ -378,7 +390,7 @@ public class TaskScheduler extends AbstractService
   // AMRMClientAsync interface methods
   @Override
   public void onContainersCompleted(List<ContainerStatus> statuses) {
-    if(isStopped) {
+    if (isStopped.get()) {
       return;
     }
     Map<Object, ContainerStatus> appContainerStatus =
@@ -435,7 +447,7 @@ public class TaskScheduler extends AbstractService
 
   @Override
   public void onContainersAllocated(List<Container> containers) {
-    if (isStopped) {
+    if (isStopped.get()) {
       return;
     }
     Map<CookieContainerRequest, Container> assignedContainers;
@@ -736,7 +748,7 @@ public class TaskScheduler extends AbstractService
 
   @Override
   public void onShutdownRequest() {
-    if(isStopped) {
+    if (isStopped.get()) {
       return;
     }
     // upcall to app must be outside locks
@@ -745,7 +757,7 @@ public class TaskScheduler extends AbstractService
 
   @Override
   public void onNodesUpdated(List<NodeReport> updatedNodes) {
-    if(isStopped) {
+    if (isStopped.get()) {
       return;
     }
     // ignore bad nodes for now
@@ -755,7 +767,7 @@ public class TaskScheduler extends AbstractService
 
   @Override
   public float getProgress() {
-    if(isStopped) {
+    if (isStopped.get()) {
       return 1;
     }
 
@@ -776,7 +788,7 @@ public class TaskScheduler extends AbstractService
 
   @Override
   public void onError(Throwable t) {
-    if(isStopped) {
+    if (isStopped.get()) {
       return;
     }
     appClientDelegate.onError(t);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2118e964/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index a597ee1..ebc4111 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -458,7 +458,7 @@ public class TaskSchedulerEventHandler extends AbstractService
     // This can happen if the RM has been restarted. If it is in that state,
     // this application must clean itself up.
     LOG.info("App shutdown requested by scheduler");
-    sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.INTERNAL_ERROR));
+    sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT));
   }
 
   @Override
@@ -550,4 +550,10 @@ public class TaskSchedulerEventHandler extends AbstractService
     sendEvent(new AMContainerEventCompleted(ContainerStatus.newInstance(
         containerId, ContainerState.COMPLETE, "Container Preempted Internally", -1), true));
   }
+
+  public void setShouldUnregisterFlag() {
+    this.taskScheduler.setShouldUnregister();
+    LOG.info("TaskScheduler notified that it should unregister from RM");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2118e964/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index f7601a6..087da83 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -195,6 +195,7 @@ class TestTaskSchedulerHelpers {
         AppContext appContext) {
       super(appClient, containerSignatureMatcher, appHostName, appHostPort,
           appTrackingUrl, appContext);
+      shouldUnregister.set(true);
     }
 
     public TaskSchedulerWithDrainableAppCallback(
@@ -205,6 +206,7 @@ class TestTaskSchedulerHelpers {
         AppContext appContext) {
       super(appClient, containerSignatureMatcher, appHostName, appHostPort,
           appTrackingUrl, client, appContext);
+      shouldUnregister.set(true);
     }
 
     @Override