You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2016/02/18 02:53:10 UTC

tez git commit: TEZ-3117. Deadlock in Edge and Vertex code (bikas) (cherry picked from commit de3a0748ff19b5ced87050596d088bdb573cae05)

Repository: tez
Updated Branches:
  refs/heads/branch-0.7 126217147 -> b55ba592b


TEZ-3117. Deadlock in Edge and Vertex code (bikas)
(cherry picked from commit de3a0748ff19b5ced87050596d088bdb573cae05)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b55ba592
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b55ba592
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b55ba592

Branch: refs/heads/branch-0.7
Commit: b55ba592b475e5ec391541b41e99e0c2542d2c6d
Parents: 1262171
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 17 17:48:55 2016 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 17 17:52:51 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/dag/app/dag/impl/Edge.java   | 59 ++++++++++++--------
 2 files changed, 36 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b55ba592/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index adf23a9..81a50a3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES
+  TEZ-3117. Deadlock in Edge and Vertex code
   TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime).
   TEZ-3103. Shuffle can hang when memory to memory merging enabled
   TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and earlier

http://git-wip-us.apache.org/repos/asf/tez/blob/b55ba592/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 0be7790..bb4d319 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -174,19 +174,24 @@ public class Edge {
             + getEdgeInfo(), e);
       }
     }
-    destinationMetaInfo = new EventMetaData(EventProducerConsumerType.INPUT, 
-        destinationVertex.getName(), 
-        sourceVertex.getName(), 
-        null);
+    synchronized (this) {
+      destinationMetaInfo = new EventMetaData(EventProducerConsumerType.INPUT,
+              destinationVertex.getName(),
+              sourceVertex.getName(),
+              null);
+    }
   }
 
-  public synchronized void setEdgeProperty(EdgeProperty newEdgeProperty) throws AMUserCodeException {
-    this.edgeProperty = newEdgeProperty;
-    boolean wasUnInitialized = (edgeManager == null);
-    try {
-      createEdgeManager();
-    } catch (TezException e) {
-      throw new AMUserCodeException(Source.EdgeManager, e);
+  public void setEdgeProperty(EdgeProperty newEdgeProperty) throws AMUserCodeException {
+    boolean wasUnInitialized;
+    synchronized (this) {
+      this.edgeProperty = newEdgeProperty;
+      wasUnInitialized = (edgeManager == null);
+      try {
+        createEdgeManager();
+      } catch (TezException e) {
+        throw new AMUserCodeException(Source.EdgeManager, e);
+      }
     }
     initialize();
     if (wasUnInitialized) {
@@ -199,7 +204,7 @@ public class Edge {
   
   // Test only method for creating specific scenarios
   @VisibleForTesting
-  synchronized void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor)
+  void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor)
       throws AMUserCodeException {
     EdgeProperty modifiedEdgeProperty =
         EdgeProperty.create(descriptor,
@@ -210,22 +215,28 @@ public class Edge {
     setEdgeProperty(modifiedEdgeProperty);
   }
   
-  public synchronized void routingToBegin() throws AMUserCodeException {
-    if (edgeManagerContext.getDestinationVertexNumTasks() == 0) {
-      routingNeeded = false;
-    } else if (edgeManagerContext.getDestinationVertexNumTasks() < 0) {
-      throw new TezUncheckedException(
-          "Internal error. Not expected to route events to a destination until parallelism is determined" +
-          " sourceVertex=" + sourceVertex.getLogIdentifier() +
-          " edgeManager=" + edgeManager.getClass().getName());
+  public void routingToBegin() throws AMUserCodeException {
+    int numDestTasks = edgeManagerContext.getDestinationVertexNumTasks();
+    synchronized (this) {
+      if (numDestTasks == 0) {
+        routingNeeded = false;
+      } else if (numDestTasks < 0) {
+        throw new TezUncheckedException(
+                "Internal error. Not expected to route events to a destination until parallelism is determined" +
+                        " sourceVertex=" + sourceVertex.getLogIdentifier() +
+                        " edgeManager=" + edgeManager.getClass().getName());
+      }
+      if (edgeManager instanceof EdgeManagerPluginOnDemand) {
+        onDemandRouting = true;
+      }
     }
-    if (edgeManager instanceof EdgeManagerPluginOnDemand) {
-      onDemandRouting = true;
+
+    if (onDemandRouting) {
       try {
-        ((EdgeManagerPluginOnDemand)edgeManager).prepareForRouting();
+        ((EdgeManagerPluginOnDemand) edgeManager).prepareForRouting();
       } catch (Exception e) {
         throw new AMUserCodeException(Source.EdgeManager,
-            "Fail to prepareForRouting " + getEdgeInfo(), e);
+                "Fail to prepareForRouting " + getEdgeInfo(), e);
       }
     }