You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2016/06/13 00:01:36 UTC
tez git commit: TEZ-3297. Deadlock scenario in AM during
ShuffleVertexManager auto reduce (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 1d11ad275 -> 51fb3e4cf
TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/51fb3e4c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/51fb3e4c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/51fb3e4c
Branch: refs/heads/master
Commit: 51fb3e4cf0213a39e52476127f390639247e089b
Parents: 1d11ad2
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Mon Jun 13 05:15:51 2016 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Mon Jun 13 05:15:51 2016 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 120 ++++++++++++-------
2 files changed, 80 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/51fb3e4c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2229708..73a11fa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870
TEZ-3295. TestOrderedWordCount should handle relative input/output paths.
TEZ-3290. Set full task attempt id string in MRInput configuration object.
@@ -58,6 +59,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870
TEZ-3290. Set full task attempt id string in MRInput configuration object.
TEZ-3280. LOG MRInputHelpers split generation message as INFO
@@ -506,6 +508,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870
TEZ-3280. LOG MRInputHelpers split generation message as INFO
TEZ-3257. Fix flaky test TestUnorderedPartitionedKVWriter.
http://git-wip-us.apache.org/repos/asf/tez/blob/51fb3e4c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b22af1a..6b79e98 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1593,18 +1593,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
} finally {
writeLock.unlock();
}
-
- readLock.lock();
- try {
- for (ScheduleTaskRequest task : tasksToSchedule) {
- TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex());
- TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId());
- boolean fromRecovery = recoveryData == null ? false : recoveryData.getTaskRecoveryData(taskId) != null;
- eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec,
- getTaskLocationHint(taskId), fromRecovery));
- }
- } finally {
- readLock.unlock();
+
+ /**
+ * read lock is not needed here. For e.g after starting task
+ * scheduling on the vertex, it would not change numTasks. Rest of
+ * the methods creating remote task specs have their
+ * own locking mechanisms. Ref: TEZ-3297
+ */
+ for (ScheduleTaskRequest task : tasksToSchedule) {
+ TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex());
+ TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId());
+ boolean fromRecovery = recoveryData == null ? false : recoveryData.getTaskRecoveryData(taskId) != null;
+ eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec,
+ getTaskLocationHint(taskId), fromRecovery));
}
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier();
@@ -4040,17 +4041,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Override
public void setInputVertices(Map<Vertex, Edge> inVertices) {
- this.sourceVertices = inVertices;
- for (Vertex vertex : sourceVertices.keySet()) {
- addIO(vertex.getName());
+ writeLock.lock();
+ try {
+ this.sourceVertices = inVertices;
+ for (Vertex vertex : sourceVertices.keySet()) {
+ addIO(vertex.getName());
+ }
+ } finally {
+ writeLock.unlock();
}
}
@Override
public void setOutputVertices(Map<Vertex, Edge> outVertices) {
- this.targetVertices = outVertices;
- for (Vertex vertex : targetVertices.keySet()) {
- addIO(vertex.getName());
+ writeLock.lock();
+ try {
+ this.targetVertices = outVertices;
+ for (Vertex vertex : targetVertices.keySet()) {
+ addIO(vertex.getName());
+ }
+ } finally {
+ writeLock.unlock();;
}
}
@@ -4250,9 +4261,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Override
public List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException {
+ // For locking strategy, please refer to getOutputSpecList()
readLock.lock();
+ List<InputSpec> inputSpecList = null;
try {
- List<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
+ inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
+ (rootInputDescriptors == null ? 0 : rootInputDescriptors.size()));
if (rootInputDescriptors != null) {
for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
@@ -4262,44 +4275,65 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex)));
}
}
- for(Vertex vertex : getInputVertices().keySet()) {
- /**
- * It is possible that setParallelism is in the middle of processing in target vertex with
- * its write lock. So we need to get inputspec by acquiring read lock in target vertex to
- * get consistent view.
- * Refer TEZ-2251
- */
- InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex);
- // TODO DAGAM This should be based on the edge type.
- inputSpecList.add(inputSpec);
- }
- return inputSpecList;
} finally {
readLock.unlock();
}
+
+ for(Vertex vertex : getInputVertices().keySet()) {
+ /**
+ * It is possible that setParallelism is in the middle of processing in target vertex with
+ * its write lock. So we need to get inputspec by acquiring read lock in target vertex to
+ * get consistent view.
+ * Refer TEZ-2251
+ */
+ InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex);
+ // TODO DAGAM This should be based on the edge type.
+ inputSpecList.add(inputSpec);
+ }
+ return inputSpecList;
}
@Override
public List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException {
+ /**
+ * Ref: TEZ-3297
+ * Locking entire method could introduce a nested lock and
+ * could lead to deadlock in corner cases. Example of deadlock with nested lock here:
+ * 1. In thread#1, Downstream vertex is in the middle of processing setParallelism and gets
+ * writeLock.
+ * 2. In thread#2, currentVertex acquires read lock
+ * 3. In thread#3, central dispatcher tries to process an event for current vertex,
+ * so tries to acquire write lock.
+ *
+ * In further processing,
+ * 4. In thread#1, it tries to acquire readLock on current vertex for setting edges. But
+ * this would be blocked as #3 already requested for write lock
+ * 5. In thread#2, getting readLock on downstream vertex would be blocked as writeLock
+ * is held by thread#1.
+ * 6. thread#3 is anyways blocked due to thread#2's read lock on current vertex.
+ */
+
+ List<OutputSpec> outputSpecList = null;
readLock.lock();
try {
- List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount()
+ outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount()
+ this.additionalOutputSpecs.size());
outputSpecList.addAll(additionalOutputSpecs);
- for(Vertex vertex : targetVertices.keySet()) {
- /**
- * It is possible that setParallelism (which could change numTasks) is in the middle of
- * processing in target vertex with its write lock. So we need to get outputspec by
- * acquiring read lock in target vertex to get consistent view.
- * Refer TEZ-2251
- */
- OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex);
- outputSpecList.add(outputSpec);
- }
- return outputSpecList;
} finally {
readLock.unlock();
}
+
+ for(Vertex vertex : targetVertices.keySet()) {
+ /**
+ * It is possible that setParallelism (which could change numTasks) is in the middle of
+ * processing in target vertex with its write lock. So we need to get outputspec by
+ * acquiring read lock in target vertex to get consistent view.
+ * Refer TEZ-2251
+ */
+ OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex);
+ outputSpecList.add(outputSpec);
+ }
+ return outputSpecList;
}
private OutputSpec getSourceSpecFor(VertexImpl vertex, int taskIndex) throws