You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/05/02 20:05:51 UTC
tez git commit: TEZ-3193. Deadlock in AM during task commit request.
(Jason Lowe via hitesh)
Repository: tez
Updated Branches:
refs/heads/master 727584fd2 -> c3b8b8523
TEZ-3193. Deadlock in AM during task commit request. (Jason Lowe via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c3b8b852
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c3b8b852
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c3b8b852
Branch: refs/heads/master
Commit: c3b8b852331c41f9a9b41c7c74995f52f4578d99
Parents: 727584f
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon May 2 11:00:48 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon May 2 11:00:48 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 61 ++++++--------------
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 11 +++-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 21 ++-----
.../tez/dag/app/dag/impl/TestTaskImpl.java | 13 +++--
5 files changed, 43 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b436b9a..37d47cf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.9.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3193. Deadlock in AM during task commit request.
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
TEZ-3207. Add support for fetching multiple partitions from the same source task to UnorderedKVInput.
TEZ-3232. Disable randomFailingInputs in testFaulttolerance to unblock other tests.
@@ -27,6 +28,7 @@ Release 0.8.4: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3193. Deadlock in AM during task commit request.
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts.
TEZ-3224. User payload is not initialized before creating vertex manager plugin.
@@ -465,6 +467,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES:
+ TEZ-3193. Deadlock in AM during task commit request.
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
TEZ-3224. User payload is not initialized before creating vertex manager plugin.
TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs
http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 6d9247e..e39315b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -30,7 +30,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
@@ -71,7 +70,6 @@ import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.RecoveryParser.TaskAttemptRecoveryData;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
@@ -107,7 +105,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
-import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -190,8 +187,9 @@ public class TaskAttemptImpl implements TaskAttempt,
private String nodeHttpAddress;
private String nodeRackName;
- private final Task task;
private final Vertex vertex;
+ private final TaskLocationHint locationHint;
+ private final TaskSpec taskSpec;
@VisibleForTesting
boolean appendNextDataEvent = true;
@@ -465,22 +463,25 @@ public class TaskAttemptImpl implements TaskAttempt,
.installTopology();
@SuppressWarnings("rawtypes")
- public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
+ public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex,
- Task task) {
- this(taskId, attemptNumber, eventHandler, taskCommunicatorManagerInterface, conf, clock,
+ Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec) {
+ this(attemptId, eventHandler, taskCommunicatorManagerInterface, conf, clock,
taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
- task, null);
+ vertex, locationHint, taskSpec, null);
}
- public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
+
+ @SuppressWarnings("rawtypes")
+ public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex,
- Task task, TezTaskAttemptID schedulingCausalTA) {
+ Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec,
+ TezTaskAttemptID schedulingCausalTA) {
MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
@@ -496,15 +497,16 @@ public class TaskAttemptImpl implements TaskAttempt,
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
- this.attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
+ this.attemptId = attemptId;
this.eventHandler = eventHandler;
//Reported status
this.conf = conf;
this.clock = clock;
this.taskHeartbeatHandler = taskHeartbeatHandler;
this.appContext = appContext;
- this.task = task;
- this.vertex = this.task.getVertex();
+ this.vertex = vertex;
+ this.locationHint = locationHint;
+ this.taskSpec = taskSpec;
this.creationCausalTA = schedulingCausalTA;
this.creationTime = clock.getTime();
@@ -548,14 +550,6 @@ public class TaskAttemptImpl implements TaskAttempt,
return creationCausalTA;
}
- TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
- TaskSpec baseTaskSpec = task.getBaseTaskSpec();
- return new TaskSpec(getID(),
- baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
- baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
- baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
- }
-
@Override
public TaskAttemptReport getReport() {
TaskAttemptReport result = Records.newRecord(TaskAttemptReport.class);
@@ -1036,7 +1030,7 @@ public class TaskAttemptImpl implements TaskAttempt,
}
private TaskLocationHint getTaskLocationHint() {
- return task.getTaskLocationHint();
+ return locationHint;
}
protected String[] resolveHosts(String[] src) {
@@ -1226,22 +1220,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
ta.scheduledTime = ta.clock.getTime();
- // Create the remote task.
- TaskSpec remoteTaskSpec;
- try {
- remoteTaskSpec = ta.createRemoteTaskSpec();
- if (LOG.isDebugEnabled()) {
- LOG.debug("remoteTaskSpec:" + remoteTaskSpec);
- }
- } catch (AMUserCodeException e) {
- String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta;
- LOG.error(msg, e);
- String diag = msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause());
- new TerminateTransition(FAILED_HELPER).transition(ta,
- new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, diag,
- TaskAttemptTerminationCause.APPLICATION_ERROR));
- return TaskAttemptStateInternal.FAILED;
- }
// Create startTaskRequest
String[] requestHosts = new String[0];
@@ -1271,10 +1249,7 @@ public class TaskAttemptImpl implements TaskAttempt,
locationHint = null;
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Asking for container launch with taskAttemptContext: "
- + remoteTaskSpec);
- }
+ LOG.debug("Asking for container launch with taskAttemptContext: {}", ta.taskSpec);
// Send out a launch request to the scheduler.
int priority;
@@ -1288,7 +1263,7 @@ public class TaskAttemptImpl implements TaskAttempt,
// TODO Jira post TEZ-2003 getVertex implementation is very inefficient. This should be via references, instead of locked table lookups.
Vertex vertex = ta.getVertex();
AMSchedulerEventTALaunchRequest launchRequestEvent = new AMSchedulerEventTALaunchRequest(
- ta.attemptId, ta.taskResource, remoteTaskSpec, ta, locationHint,
+ ta.attemptId, ta.taskResource, ta.taskSpec, ta, locationHint,
priority, ta.containerContext,
vertex.getTaskSchedulerIdentifier(),
vertex.getContainerLauncherIdentifier(),
http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 26ba004..28a1c5e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -93,6 +93,7 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
@@ -718,9 +719,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
- return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
+ TezTaskAttemptID attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
+ TaskSpec taskSpec = new TaskSpec(attemptId,
+ baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
+ baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
+ baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
+ return new TaskAttemptImpl(attemptId, eventHandler,
taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext,
- (failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA);
+ (failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(),
+ locationHint, taskSpec, schedulingCausalTA);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index e4cd956..a50ca49 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -78,7 +78,6 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskCommunicatorWrapper;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -112,10 +111,10 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.serviceplugins.api.ServicePluginException;
import org.junit.Assert;
@@ -139,7 +138,6 @@ public class TestTaskAttempt {
}
AppContext appCtx;
- Task mockTask;
TaskLocationHint locationHint;
Vertex mockVertex;
ServicePluginInfo servicePluginInfo = new ServicePluginInfo()
@@ -156,9 +154,7 @@ public class TestTaskAttempt {
when(appCtx.getContainerLauncherName(anyInt())).thenReturn(
TezConstants.getTezYarnServicePluginName());
- mockTask = mock(Task.class);
mockVertex = mock(Vertex.class);
- when(mockTask.getVertex()).thenReturn(mockVertex);
when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);
HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
@@ -196,7 +192,6 @@ public class TestTaskAttempt {
+ AMSchedulerEventTALaunchRequest.class.getName());
}
- verify(mockTask, times(1)).getTaskLocationHint();
// TODO Move the Rack request check to the client after TEZ-125 is fixed.
Set<String> requestedRacks = taImpl.taskRacks;
assertEquals(1, requestedRacks.size());
@@ -1742,12 +1737,12 @@ public class TestTaskAttempt {
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex) {
- super(taskId, attemptNumber, eventHandler, tal, conf,
+ super(TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber),
+ eventHandler, tal, conf,
clock, taskHeartbeatHandler, appContext,
- isRescheduled, resource, containerContext, leafVertex, mockTask, null);
- when(mockTask.getTaskLocationHint()).thenReturn(locationHint);
+ isRescheduled, resource, containerContext, leafVertex, mockVertex,
+ locationHint, null, null);
}
-
boolean inputFailedReported = false;
@@ -1757,12 +1752,6 @@ public class TestTaskAttempt {
}
@Override
- protected TaskSpec createRemoteTaskSpec() {
- // FIXME
- return null;
- }
-
- @Override
protected void logJobHistoryAttemptStarted() {
taskAttemptStartedEventLogged++;
super.logJobHistoryAttemptStarted();
http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index f88ab7c..fb2f543 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -82,6 +82,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -948,8 +949,9 @@ public class TestTaskImpl {
@Override
protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) {
- MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
- attemptNumber, eventHandler, taskCommunicatorManagerInterface,
+ MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(
+ TezBuilderUtils.newTaskAttemptId(getTaskId(), attemptNumber),
+ eventHandler, taskCommunicatorManagerInterface,
conf, clock, taskHeartbeatHandler, appContext,
true, taskResource, containerContext, schedCausalTA);
taskAttempts.add(attempt);
@@ -995,13 +997,14 @@ public class TestTaskImpl {
private float progress = 0;
private TaskAttemptState state = TaskAttemptState.NEW;
- public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
+ public MockTaskAttemptImpl(TezTaskAttemptID attemptId,
EventHandler eventHandler, TaskCommunicatorManagerInterface tal, Configuration conf,
Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
- super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
- appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class), schedCausalTA);
+ super(attemptId, eventHandler, tal, conf, clock, thh,
+ appContext, isRescheduled, resource, containerContext, false, null,
+ locationHint, mockTaskSpec, schedCausalTA);
}
@Override