You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2016/02/18 02:49:06 UTC
tez git commit: TEZ-3117. Deadlock in Edge and Vertex code (bikas)
Repository: tez
Updated Branches:
refs/heads/master a812c3462 -> de3a0748f
TEZ-3117. Deadlock in Edge and Vertex code (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/de3a0748
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/de3a0748
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/de3a0748
Branch: refs/heads/master
Commit: de3a0748ff19b5ced87050596d088bdb573cae05
Parents: a812c34
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 17 17:48:55 2016 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 17 17:48:55 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/tez/dag/app/dag/impl/Edge.java | 59 ++++++++++++--------
2 files changed, 37 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/de3a0748/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af643dd..d10b47a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-3029. Add an onError method to service plugin contexts.
ALL CHANGES:
+ TEZ-3117. Deadlock in Edge and Vertex code
TEZ-3103. Shuffle can hang when memory to memory merging enabled
TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime).
TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and earlier
@@ -334,6 +335,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-3117. Deadlock in Edge and Vertex code
TEZ-3103. Shuffle can hang when memory to memory merging enabled
TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime).
TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and earlier
http://git-wip-us.apache.org/repos/asf/tez/blob/de3a0748/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 0be7790..bb4d319 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -174,19 +174,24 @@ public class Edge {
+ getEdgeInfo(), e);
}
}
- destinationMetaInfo = new EventMetaData(EventProducerConsumerType.INPUT,
- destinationVertex.getName(),
- sourceVertex.getName(),
- null);
+ synchronized (this) {
+ destinationMetaInfo = new EventMetaData(EventProducerConsumerType.INPUT,
+ destinationVertex.getName(),
+ sourceVertex.getName(),
+ null);
+ }
}
- public synchronized void setEdgeProperty(EdgeProperty newEdgeProperty) throws AMUserCodeException {
- this.edgeProperty = newEdgeProperty;
- boolean wasUnInitialized = (edgeManager == null);
- try {
- createEdgeManager();
- } catch (TezException e) {
- throw new AMUserCodeException(Source.EdgeManager, e);
+ public void setEdgeProperty(EdgeProperty newEdgeProperty) throws AMUserCodeException {
+ boolean wasUnInitialized;
+ synchronized (this) {
+ this.edgeProperty = newEdgeProperty;
+ wasUnInitialized = (edgeManager == null);
+ try {
+ createEdgeManager();
+ } catch (TezException e) {
+ throw new AMUserCodeException(Source.EdgeManager, e);
+ }
}
initialize();
if (wasUnInitialized) {
@@ -199,7 +204,7 @@ public class Edge {
// Test only method for creating specific scenarios
@VisibleForTesting
- synchronized void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor)
+ void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor)
throws AMUserCodeException {
EdgeProperty modifiedEdgeProperty =
EdgeProperty.create(descriptor,
@@ -210,22 +215,28 @@ public class Edge {
setEdgeProperty(modifiedEdgeProperty);
}
- public synchronized void routingToBegin() throws AMUserCodeException {
- if (edgeManagerContext.getDestinationVertexNumTasks() == 0) {
- routingNeeded = false;
- } else if (edgeManagerContext.getDestinationVertexNumTasks() < 0) {
- throw new TezUncheckedException(
- "Internal error. Not expected to route events to a destination until parallelism is determined" +
- " sourceVertex=" + sourceVertex.getLogIdentifier() +
- " edgeManager=" + edgeManager.getClass().getName());
+ public void routingToBegin() throws AMUserCodeException {
+ int numDestTasks = edgeManagerContext.getDestinationVertexNumTasks();
+ synchronized (this) {
+ if (numDestTasks == 0) {
+ routingNeeded = false;
+ } else if (numDestTasks < 0) {
+ throw new TezUncheckedException(
+ "Internal error. Not expected to route events to a destination until parallelism is determined" +
+ " sourceVertex=" + sourceVertex.getLogIdentifier() +
+ " edgeManager=" + edgeManager.getClass().getName());
+ }
+ if (edgeManager instanceof EdgeManagerPluginOnDemand) {
+ onDemandRouting = true;
+ }
}
- if (edgeManager instanceof EdgeManagerPluginOnDemand) {
- onDemandRouting = true;
+
+ if (onDemandRouting) {
try {
- ((EdgeManagerPluginOnDemand)edgeManager).prepareForRouting();
+ ((EdgeManagerPluginOnDemand) edgeManager).prepareForRouting();
} catch (Exception e) {
throw new AMUserCodeException(Source.EdgeManager,
- "Fail to prepareForRouting " + getEdgeInfo(), e);
+ "Fail to prepareForRouting " + getEdgeInfo(), e);
}
}