You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/05/04 00:00:12 UTC
[1/3] tez git commit: TEZ-3193. Deadlock in AM during task commit
request. (Jason Lowe via hitesh)
Repository: tez
Updated Branches:
refs/heads/branch-0.7.1 258d68131 -> 81c09a069
TEZ-3193. Deadlock in AM during task commit request. (Jason Lowe via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8daa21b6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8daa21b6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8daa21b6
Branch: refs/heads/branch-0.7.1
Commit: 8daa21b649e67a4e4365bbe744cfeba770abe97d
Parents: 0ba1e97
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue May 3 14:54:49 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue May 3 14:54:49 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 44 +++++++++++---------
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 13 +++++-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 20 +++------
.../app/dag/impl/TestTaskAttemptRecovery.java | 5 ++-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 14 ++++---
6 files changed, 53 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 36d3d55..8a9ae7f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES:
+ TEZ-3193. Deadlock in AM during task commit request.
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
TEZ-3213. Uncaught exception during vertex recovery leads to invalid state transition loop.
TEZ-3224. User payload is not initialized before creating vertex manager plugin.
http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 9b8fd80..a2da34a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -62,7 +62,6 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
@@ -99,7 +98,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
-import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -180,8 +178,9 @@ public class TaskAttemptImpl implements TaskAttempt,
private String nodeHttpAddress;
private String nodeRackName;
- private final Task task;
private final Vertex vertex;
+ private final TaskLocationHint locationHint;
+ private TaskSpec taskSpec;
@VisibleForTesting
boolean appendNextDataEvent = true;
@@ -451,22 +450,25 @@ public class TaskAttemptImpl implements TaskAttempt,
private boolean recoveryStartEventSeen = false;
@SuppressWarnings("rawtypes")
- public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
+ public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex,
- Task task) {
- this(taskId, attemptNumber, eventHandler, taskAttemptListener, conf, clock,
+ Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec) {
+ this(attemptId, eventHandler, taskAttemptListener, conf, clock,
taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
- task, null);
+ vertex, locationHint, taskSpec, null);
}
- public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
+
+ @SuppressWarnings("rawtypes")
+ public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex,
- Task task, TezTaskAttemptID schedulingCausalTA) {
+ Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec,
+ TezTaskAttemptID schedulingCausalTA) {
MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
@@ -482,15 +484,16 @@ public class TaskAttemptImpl implements TaskAttempt,
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
- this.attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
+ this.attemptId = attemptId;
this.eventHandler = eventHandler;
//Reported status
this.conf = conf;
this.clock = clock;
this.taskHeartbeatHandler = taskHeartbeatHandler;
this.appContext = appContext;
- this.task = task;
- this.vertex = this.task.getVertex();
+ this.vertex = vertex;
+ this.locationHint = locationHint;
+ this.taskSpec = taskSpec;
this.creationCausalTA = schedulingCausalTA;
this.creationTime = clock.getTime();
@@ -533,20 +536,20 @@ public class TaskAttemptImpl implements TaskAttempt,
return creationCausalTA;
}
- TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
- TaskSpec baseTaskSpec = task.getBaseTaskSpec();
- if (baseTaskSpec == null) {
+ TaskSpec getTaskSpec() throws AMUserCodeException {
+ if (taskSpec == null) {
// since recovery does not follow normal transitions, TaskEventScheduleTask
// is not being honored by the recovery code path. Using this to workaround
// until recovery is fixed. Calling the non-locking internal method of the vertex
// to get the taskSpec directly. Since everything happens on the central dispatcher
// during recovery this is deadlock free for now. TEZ-1019 should remove the need for this.
- baseTaskSpec = ((VertexImpl) vertex).createRemoteTaskSpec(getID().getTaskID().getId());
- }
- return new TaskSpec(getID(),
+ TaskSpec baseTaskSpec = ((VertexImpl) vertex).createRemoteTaskSpec(getID().getTaskID().getId());
+ taskSpec = new TaskSpec(getID(),
baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
+ }
+ return taskSpec;
}
@Override
@@ -1065,7 +1068,7 @@ public class TaskAttemptImpl implements TaskAttempt,
}
private TaskLocationHint getTaskLocationHint() {
- return task.getTaskLocationHint();
+ return locationHint;
}
protected String[] resolveHosts(String[] src) {
@@ -1170,7 +1173,7 @@ public class TaskAttemptImpl implements TaskAttempt,
// Create the remote task.
TaskSpec remoteTaskSpec;
try {
- remoteTaskSpec = ta.createRemoteTaskSpec();
+ remoteTaskSpec = ta.getTaskSpec();
if (LOG.isDebugEnabled()) {
LOG.debug("remoteTaskSpec:" + remoteTaskSpec);
}
@@ -1183,6 +1186,7 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptTerminationCause.APPLICATION_ERROR));
return TaskAttemptStateInternal.FAILED;
}
+
// Create startTaskRequest
String[] requestHosts = new String[0];
http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index ad678d7..4a0742f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -95,6 +95,7 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
@@ -826,9 +827,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
- return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
+ TezTaskAttemptID attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
+ TaskSpec taskSpec = null;
+ if (baseTaskSpec != null) {
+ taskSpec = new TaskSpec(attemptId, baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
+ baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
+ baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
+ }
+ return new TaskAttemptImpl(attemptId, eventHandler,
taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
- (failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA);
+ (failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(),
+ locationHint, taskSpec, schedulingCausalTA);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 6f06f2d..97108d4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -100,6 +100,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -122,7 +123,6 @@ public class TestTaskAttempt {
}
AppContext appCtx;
- Task mockTask;
TaskLocationHint locationHint;
@BeforeClass
@@ -133,7 +133,6 @@ public class TestTaskAttempt {
@Before
public void setupTest() {
appCtx = mock(AppContext.class);
- mockTask = mock(Task.class);
HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
}
@@ -168,7 +167,6 @@ public class TestTaskAttempt {
+ AMSchedulerEventTALaunchRequest.class.getName());
}
- verify(mockTask, times(1)).getTaskLocationHint();
// TODO Move the Rack request check to the client after TEZ-125 is fixed.
Set<String> requestedRacks = taImpl.taskRacks;
assertEquals(1, requestedRacks.size());
@@ -1567,23 +1565,17 @@ public class TestTaskAttempt {
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex) {
- super(taskId, attemptNumber, eventHandler, tal, conf,
+ super(TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber),
+ eventHandler, tal, conf,
clock, taskHeartbeatHandler, appContext,
- isRescheduled, resource, containerContext, leafVertex, mockTask);
- when(mockTask.getTaskLocationHint()).thenReturn(locationHint);
+ isRescheduled, resource, containerContext, leafVertex, mock(Vertex.class),
+ locationHint, null);
}
-
- Vertex mockVertex = mock(Vertex.class);
boolean inputFailedReported = false;
@Override
- protected Vertex getVertex() {
- return mockVertex;
- }
-
- @Override
- protected TaskSpec createRemoteTaskSpec() {
+ protected TaskSpec getTaskSpec() {
// FIXME
return null;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index cd37ab9..197a442 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -62,6 +62,7 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -148,11 +149,11 @@ public class TestTaskAttemptRecovery {
TezTaskID taskId =
TezTaskID.fromString("task_1407371892933_0001_1_00_000000");
ta =
- new TaskAttemptImpl(taskId, 0, mockEventHandler,
+ new TaskAttemptImpl(TezBuilderUtils.newTaskAttemptId(taskId, 0), mockEventHandler,
mock(TaskAttemptListener.class), new Configuration(),
new SystemClock(), mock(TaskHeartbeatHandler.class),
mockAppContext, false, Resource.newInstance(1, 1),
- mock(ContainerContext.class), false, mockTask);
+ mock(ContainerContext.class), false, mockVertex, null, null);
taId = ta.getID();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 3b154a5..b7a6d21 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -72,6 +72,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -888,8 +889,9 @@ public class TestTaskImpl {
@Override
protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) {
- MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
- attemptNumber, eventHandler, taskAttemptListener,
+ MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(
+ TezBuilderUtils.newTaskAttemptId(getTaskId(), attemptNumber),
+ eventHandler, taskAttemptListener,
conf, clock, taskHeartbeatHandler, appContext,
true, taskResource, containerContext, schedCausalTA);
taskAttempts.add(attempt);
@@ -934,14 +936,14 @@ public class TestTaskImpl {
private float progress = 0;
private TaskAttemptState state = TaskAttemptState.NEW;
- public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
+ public MockTaskAttemptImpl(TezTaskAttemptID attemptId,
EventHandler eventHandler, TaskAttemptListener tal, Configuration conf,
Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
- super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
- appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class),
- schedCausalTA);
+ super(attemptId, eventHandler, tal, conf, clock, thh,
+ appContext, isRescheduled, resource, containerContext, false,
+ mock(Vertex.class), locationHint, mockTaskSpec, schedCausalTA);
}
@Override
[3/3] tez git commit: TEZ-3236. Update release date in CHANGES.txt.
(hitesh)
Posted by hi...@apache.org.
TEZ-3236. Update release date in CHANGES.txt. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/81c09a06
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/81c09a06
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/81c09a06
Branch: refs/heads/branch-0.7.1
Commit: 81c09a069a403793aef080b09605441606b47276
Parents: 618bd93
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue May 3 14:59:47 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue May 3 14:59:47 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/81c09a06/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 54c4314..fb456a0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,7 @@
Apache Tez Change Log
=====================
-Release 0.7.1: 2015-05-05
+Release 0.7.1: 2015-05-10
INCOMPATIBLE CHANGES
TEZ-2679. Admin forms of launch env settings
[2/3] tez git commit: Merge remote-tracking branch
'origin/branch-0.7' into branch-0.7.1
Posted by hi...@apache.org.
Merge remote-tracking branch 'origin/branch-0.7' into branch-0.7.1
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/618bd93a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/618bd93a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/618bd93a
Branch: refs/heads/branch-0.7.1
Commit: 618bd93a1e7bcd0359d23e82c8fe983b1971e27f
Parents: 258d681 8daa21b
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue May 3 14:58:44 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue May 3 14:58:44 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 44 +++++++++++---------
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 13 +++++-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 20 +++------
.../app/dag/impl/TestTaskAttemptRecovery.java | 5 ++-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 14 ++++---
6 files changed, 53 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/618bd93a/CHANGES.txt
----------------------------------------------------------------------