You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/01/21 21:30:22 UTC

tez git commit: TEZ-3036. Tez AM can hang on startup with no indication of error (jlowe) (cherry picked from commit 92def52ff8b02eab7aae38170ca6c9b0caf83ef7)

Repository: tez
Updated Branches:
  refs/heads/branch-0.7 238c3ad8f -> 2cbf40248


TEZ-3036. Tez AM can hang on startup with no indication of error (jlowe)
(cherry picked from commit 92def52ff8b02eab7aae38170ca6c9b0caf83ef7)

Conflicts:

	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2cbf4024
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2cbf4024
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2cbf4024

Branch: refs/heads/branch-0.7
Commit: 2cbf40248f7abe49bab4ac78d3ecff1a040d52e1
Parents: 238c3ad
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Jan 21 20:30:07 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Jan 21 20:30:07 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 48 ++++++++++++++------
 2 files changed, 34 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2cbf4024/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bd3e64a..f40029a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES
+  TEZ-3036. Tez AM can hang on startup with no indication of error
   TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED
   TEZ-3046. Compilation issue in tez-runtime-internals of branch-0.7
   TEZ-2937. Can Processor.close() be called after closing inputs and outputs?

http://git-wip-us.apache.org/repos/asf/tez/blob/2cbf4024/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 8e6c21a..c163b62 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1592,7 +1592,18 @@ public class DAGAppMaster extends AbstractService {
         LOG.debug("Service dependency: " + dependency.getName() + " notify" +
                   " for service: " + service.getName());
       }
-      if (dependency.isInState(Service.STATE.STARTED)) {
+      Throwable dependencyError = dependency.getFailureCause();
+      if (dependencyError != null) {
+        synchronized(this) {
+          dependenciesFailed = true;
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("Service: " + service.getName() + " will fail to start"
+                + " as dependent service " + dependency.getName()
+                + " failed to start: " + dependencyError);
+          }
+          this.notifyAll();
+        }
+      } else if (dependency.isInState(Service.STATE.STARTED)) {
         if(dependenciesStarted.incrementAndGet() == dependencies.size()) {
           synchronized(this) {
             if(LOG.isDebugEnabled()) {
@@ -1602,17 +1613,6 @@ public class DAGAppMaster extends AbstractService {
             this.notifyAll();
           }
         }
-      } else if (!service.isInState(Service.STATE.STARTED)
-          && dependency.getFailureState() != null) {
-        synchronized(this) {
-          dependenciesFailed = true;
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("Service: " + service.getName() + " will fail to start"
-                + " as dependent service " + dependency.getName()
-                + " failed to start");
-          }
-          this.notifyAll();
-        }
       }
     }
 
@@ -1646,9 +1646,12 @@ public class DAGAppMaster extends AbstractService {
 
   private static class ServiceThread extends Thread {
     final ServiceWithDependency serviceWithDependency;
-    Throwable error = null;
-    public ServiceThread(ServiceWithDependency serviceWithDependency) {
+    final Map<Service, ServiceWithDependency> services;
+    volatile Throwable error = null;
+    public ServiceThread(ServiceWithDependency serviceWithDependency,
+        Map<Service, ServiceWithDependency> services) {
       this.serviceWithDependency = serviceWithDependency;
+      this.services = services;
       this.setName("ServiceThread:" + serviceWithDependency.service.getName());
     }
 
@@ -1660,7 +1663,14 @@ public class DAGAppMaster extends AbstractService {
       try {
         serviceWithDependency.start();
       } catch (Throwable t) {
+        // AbstractService does not notify listeners if something throws, so
+        // notify dependent services explicitly to prevent hanging.
+        // AbstractService only records fault causes for exceptions, not
+        // errors, so dependent services will proceed thinking startup
+        // succeeded if an error is thrown. The error will be noted when the
+        // main thread joins the ServiceThread.
         error = t;
+        notifyDependentServices();
       } finally {
         if(LOG.isDebugEnabled()) {
           LOG.debug("Service: " + serviceWithDependency.service.getName() +
@@ -1672,6 +1682,14 @@ public class DAGAppMaster extends AbstractService {
             + serviceWithDependency.service.getName());
       }
     }
+
+    private void notifyDependentServices() {
+      for (ServiceWithDependency otherSvc : services.values()) {
+        if (otherSvc.dependencies.contains(serviceWithDependency.service)) {
+          otherSvc.stateChanged(serviceWithDependency.service);
+        }
+      }
+    }
   }
 
   void startServices(){
@@ -1684,7 +1702,7 @@ public class DAGAppMaster extends AbstractService {
       for(ServiceWithDependency sd : services.values()) {
         // start the service. If this fails that service
         // will be stopped and an exception raised
-        ServiceThread st = new ServiceThread(sd);
+        ServiceThread st = new ServiceThread(sd, services);
         threads.add(st);
       }