You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/01/21 21:30:22 UTC
tez git commit: TEZ-3036. Tez AM can hang on startup with no
indication of error (jlowe) (cherry picked from commit
92def52ff8b02eab7aae38170ca6c9b0caf83ef7)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 238c3ad8f -> 2cbf40248
TEZ-3036. Tez AM can hang on startup with no indication of error (jlowe)
(cherry picked from commit 92def52ff8b02eab7aae38170ca6c9b0caf83ef7)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2cbf4024
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2cbf4024
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2cbf4024
Branch: refs/heads/branch-0.7
Commit: 2cbf40248f7abe49bab4ac78d3ecff1a040d52e1
Parents: 238c3ad
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Jan 21 20:30:07 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Jan 21 20:30:07 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 48 ++++++++++++++------
2 files changed, 34 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/2cbf4024/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bd3e64a..f40029a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES
+ TEZ-3036. Tez AM can hang on startup with no indication of error
TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED
TEZ-3046. Compilation issue in tez-runtime-internals of branch-0.7
TEZ-2937. Can Processor.close() be called after closing inputs and outputs?
http://git-wip-us.apache.org/repos/asf/tez/blob/2cbf4024/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 8e6c21a..c163b62 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1592,7 +1592,18 @@ public class DAGAppMaster extends AbstractService {
LOG.debug("Service dependency: " + dependency.getName() + " notify" +
" for service: " + service.getName());
}
- if (dependency.isInState(Service.STATE.STARTED)) {
+ Throwable dependencyError = dependency.getFailureCause();
+ if (dependencyError != null) {
+ synchronized(this) {
+ dependenciesFailed = true;
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Service: " + service.getName() + " will fail to start"
+ + " as dependent service " + dependency.getName()
+ + " failed to start: " + dependencyError);
+ }
+ this.notifyAll();
+ }
+ } else if (dependency.isInState(Service.STATE.STARTED)) {
if(dependenciesStarted.incrementAndGet() == dependencies.size()) {
synchronized(this) {
if(LOG.isDebugEnabled()) {
@@ -1602,17 +1613,6 @@ public class DAGAppMaster extends AbstractService {
this.notifyAll();
}
}
- } else if (!service.isInState(Service.STATE.STARTED)
- && dependency.getFailureState() != null) {
- synchronized(this) {
- dependenciesFailed = true;
- if(LOG.isDebugEnabled()) {
- LOG.debug("Service: " + service.getName() + " will fail to start"
- + " as dependent service " + dependency.getName()
- + " failed to start");
- }
- this.notifyAll();
- }
}
}
@@ -1646,9 +1646,12 @@ public class DAGAppMaster extends AbstractService {
private static class ServiceThread extends Thread {
final ServiceWithDependency serviceWithDependency;
- Throwable error = null;
- public ServiceThread(ServiceWithDependency serviceWithDependency) {
+ final Map<Service, ServiceWithDependency> services;
+ volatile Throwable error = null;
+ public ServiceThread(ServiceWithDependency serviceWithDependency,
+ Map<Service, ServiceWithDependency> services) {
this.serviceWithDependency = serviceWithDependency;
+ this.services = services;
this.setName("ServiceThread:" + serviceWithDependency.service.getName());
}
@@ -1660,7 +1663,14 @@ public class DAGAppMaster extends AbstractService {
try {
serviceWithDependency.start();
} catch (Throwable t) {
+ // AbstractService does not notify listeners if something throws, so
+ // notify dependent services explicitly to prevent hanging.
+ // AbstractService only records fault causes for exceptions, not
+ // errors, so dependent services will proceed thinking startup
+ // succeeded if an error is thrown. The error will be noted when the
+ // main thread joins the ServiceThread.
error = t;
+ notifyDependentServices();
} finally {
if(LOG.isDebugEnabled()) {
LOG.debug("Service: " + serviceWithDependency.service.getName() +
@@ -1672,6 +1682,14 @@ public class DAGAppMaster extends AbstractService {
+ serviceWithDependency.service.getName());
}
}
+
+ private void notifyDependentServices() {
+ for (ServiceWithDependency otherSvc : services.values()) {
+ if (otherSvc.dependencies.contains(serviceWithDependency.service)) {
+ otherSvc.stateChanged(serviceWithDependency.service);
+ }
+ }
+ }
}
void startServices(){
@@ -1684,7 +1702,7 @@ public class DAGAppMaster extends AbstractService {
for(ServiceWithDependency sd : services.values()) {
// start the service. If this fails that service
// will be stopped and an exception raised
- ServiceThread st = new ServiceThread(sd);
+ ServiceThread st = new ServiceThread(sd, services);
threads.add(st);
}