You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2016/06/13 00:01:36 UTC

tez git commit: TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 1d11ad275 -> 51fb3e4cf


TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/51fb3e4c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/51fb3e4c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/51fb3e4c

Branch: refs/heads/master
Commit: 51fb3e4cf0213a39e52476127f390639247e089b
Parents: 1d11ad2
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Mon Jun 13 05:15:51 2016 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Mon Jun 13 05:15:51 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 120 ++++++++++++-------
 2 files changed, 80 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/51fb3e4c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2229708..73a11fa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
   TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870
   TEZ-3295. TestOrderedWordCount should handle relative input/output paths.
   TEZ-3290. Set full task attempt id string in MRInput configuration object.
@@ -58,6 +59,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
   TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870
   TEZ-3290. Set full task attempt id string in MRInput configuration object.
   TEZ-3280. LOG MRInputHelpers split generation message as INFO
@@ -506,6 +508,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
   TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870
   TEZ-3280. LOG MRInputHelpers split generation message as INFO
   TEZ-3257. Fix flaky test TestUnorderedPartitionedKVWriter.

http://git-wip-us.apache.org/repos/asf/tez/blob/51fb3e4c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b22af1a..6b79e98 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1593,18 +1593,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       } finally {
         writeLock.unlock();
       }
-      
-      readLock.lock();
-      try {
-        for (ScheduleTaskRequest task : tasksToSchedule) {
-          TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex());
-          TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId());
-          boolean fromRecovery = recoveryData == null ? false : recoveryData.getTaskRecoveryData(taskId) != null;
-          eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec,
-              getTaskLocationHint(taskId), fromRecovery));
-        }
-      } finally {
-        readLock.unlock();
+
+      /**
+       * read lock is not needed here. For e.g after starting task
+       * scheduling on the vertex, it would not change numTasks. Rest of
+       * the methods creating remote task specs have their
+       * own locking mechanisms. Ref: TEZ-3297
+       */
+      for (ScheduleTaskRequest task : tasksToSchedule) {
+        TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex());
+        TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId());
+        boolean fromRecovery = recoveryData == null ? false : recoveryData.getTaskRecoveryData(taskId) != null;
+        eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec,
+            getTaskLocationHint(taskId), fromRecovery));
       }
     } catch (AMUserCodeException e) {
       String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier();
@@ -4040,17 +4041,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   @Override
   public void setInputVertices(Map<Vertex, Edge> inVertices) {
-    this.sourceVertices = inVertices;
-    for (Vertex vertex : sourceVertices.keySet()) {
-      addIO(vertex.getName());
+    writeLock.lock();
+    try {
+      this.sourceVertices = inVertices;
+      for (Vertex vertex : sourceVertices.keySet()) {
+        addIO(vertex.getName());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
   @Override
   public void setOutputVertices(Map<Vertex, Edge> outVertices) {
-    this.targetVertices = outVertices;
-    for (Vertex vertex : targetVertices.keySet()) {
-      addIO(vertex.getName());
+    writeLock.lock();
+    try {
+      this.targetVertices = outVertices;
+      for (Vertex vertex : targetVertices.keySet()) {
+        addIO(vertex.getName());
+      }
+    } finally {
+      writeLock.unlock();;
     }
   }
 
@@ -4250,9 +4261,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   @Override
   public List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException {
+    // For locking strategy, please refer to getOutputSpecList()
     readLock.lock();
+    List<InputSpec> inputSpecList = null;
     try {
-      List<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
+      inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
           + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size()));
       if (rootInputDescriptors != null) {
         for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
@@ -4262,44 +4275,65 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
                   rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex)));
         }
       }
-      for(Vertex vertex : getInputVertices().keySet()) {
-        /**
-         * It is possible that setParallelism is in the middle of processing in target vertex with
-         * its write lock. So we need to get inputspec by acquiring read lock in target vertex to
-         * get consistent view.
-         * Refer TEZ-2251
-         */
-        InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex);
-        // TODO DAGAM This should be based on the edge type.
-        inputSpecList.add(inputSpec);
-      }
-      return inputSpecList;
     } finally {
       readLock.unlock();
     }
+
+    for(Vertex vertex : getInputVertices().keySet()) {
+      /**
+       * It is possible that setParallelism is in the middle of processing in target vertex with
+       * its write lock. So we need to get inputspec by acquiring read lock in target vertex to
+       * get consistent view.
+       * Refer TEZ-2251
+       */
+      InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex);
+      // TODO DAGAM This should be based on the edge type.
+      inputSpecList.add(inputSpec);
+    }
+    return inputSpecList;
   }
 
   @Override
   public List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException {
+    /**
+     * Ref: TEZ-3297
+     * Locking entire method could introduce a nested lock and
+     * could lead to deadlock in corner cases. Example of deadlock with nested lock here:
+     * 1. In thread#1, Downstream vertex is in the middle of processing setParallelism and gets
+     * writeLock.
+     * 2. In thread#2, currentVertex acquires read lock
+     * 3. In thread#3, central dispatcher tries to process an event for current vertex,
+     * so tries to acquire write lock.
+     *
+     * In further processing,
+     * 4. In thread#1, it tries to acquire readLock on current vertex for setting edges. But
+     * this would be blocked as #3 already requested for write lock
+     * 5. In thread#2, getting readLock on downstream vertex would be blocked as writeLock
+     * is held by thread#1.
+     * 6. thread#3 is anyways blocked due to thread#2's read lock on current vertex.
+     */
+
+    List<OutputSpec> outputSpecList = null;
     readLock.lock();
     try {
-      List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount()
+      outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount()
           + this.additionalOutputSpecs.size());
       outputSpecList.addAll(additionalOutputSpecs);
-      for(Vertex vertex : targetVertices.keySet()) {
-        /**
-         * It is possible that setParallelism (which could change numTasks) is in the middle of
-         * processing in target vertex with its write lock. So we need to get outputspec by
-         * acquiring read lock in target vertex to get consistent view.
-         * Refer TEZ-2251
-         */
-        OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex);
-        outputSpecList.add(outputSpec);
-      }
-      return outputSpecList;
     } finally {
       readLock.unlock();
     }
+
+    for(Vertex vertex : targetVertices.keySet()) {
+      /**
+       * It is possible that setParallelism (which could change numTasks) is in the middle of
+       * processing in target vertex with its write lock. So we need to get outputspec by
+       * acquiring read lock in target vertex to get consistent view.
+       * Refer TEZ-2251
+       */
+      OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex);
+      outputSpecList.add(outputSpec);
+    }
+    return outputSpecList;
   }
 
   private OutputSpec getSourceSpecFor(VertexImpl vertex, int taskIndex) throws