You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2019/03/22 20:56:49 UTC
[tez] branch master updated: TEZ-4045. Task should be accessible
from TaskAttempt
This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new d5675c3 TEZ-4045. Task should be accessible from TaskAttempt
d5675c3 is described below
commit d5675c332497c1ac1dedefdf91e87476b5c0d7a9
Author: Ying Han <hy...@outlook.com>
AuthorDate: Fri Mar 22 15:56:39 2019 -0500
TEZ-4045. Task should be accessible from TaskAttempt
Signed-off-by: Jonathan Eagles <je...@apache.org>
---
.../main/java/org/apache/tez/dag/app/dag/Task.java | 8 ++++++
.../org/apache/tez/dag/app/dag/TaskAttempt.java | 9 ++----
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 32 ++++++++++------------
.../org/apache/tez/dag/app/dag/impl/TaskImpl.java | 13 +++++++--
.../tez/dag/app/rm/DagAwareYarnTaskScheduler.java | 2 +-
.../tez/dag/app/rm/YarnTaskSchedulerService.java | 7 +++--
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 8 ++++--
.../apache/tez/dag/app/dag/impl/TestTaskImpl.java | 4 +--
.../dag/app/rm/TestDagAwareYarnTaskScheduler.java | 5 +++-
.../apache/tez/dag/app/rm/TestTaskScheduler.java | 5 +++-
10 files changed, 56 insertions(+), 37 deletions(-)
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index d1b9b2a..c1fe7f7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -21,7 +21,9 @@ package org.apache.tez.dag.app.dag;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.oldrecords.TaskReport;
@@ -73,4 +75,10 @@ public interface Task {
long getFirstAttemptStartTime();
long getFinishTime();
+
+ /**
+ * @return set of nodes on which previous attempts were running on, at the time
+ * of latest attempt being scheduled.
+ */
+ Set<NodeId> getNodesWithRunningAttempts();
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index d0fec5c..0fc7013 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -19,7 +19,6 @@
package org.apache.tez.dag.app.dag;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.yarn.api.records.Container;
@@ -72,7 +71,8 @@ public interface TaskAttempt {
TezTaskID getTaskID();
TezVertexID getVertexID();
TezDAGID getDAGID();
-
+
+ Task getTask();
TaskAttemptReport getReport();
List<String> getDiagnostics();
TaskAttemptTerminationCause getTerminationCause();
@@ -136,9 +136,4 @@ public interface TaskAttempt {
* yet, returns 0.
*/
long getFinishTime();
-
- /**
- * @return the set of nodes on which sibling attempts were running on.
- */
- Set<NodeId> getNodesWithSiblingRunningAttempts();
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 4608052..ade7bc7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -77,6 +77,7 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
@@ -198,6 +199,7 @@ public class TaskAttemptImpl implements TaskAttempt,
private String nodeRackName;
private final Vertex vertex;
+ private final Task task;
private final TaskLocationHint locationHint;
private final TaskSpec taskSpec;
@@ -228,12 +230,6 @@ public class TaskAttemptImpl implements TaskAttempt,
private final boolean leafVertex;
private TezTaskAttemptID creationCausalTA;
- // Record the set of nodes on which sibling attempts were running on, at the time of
- // this attempt being scheduled. This set is empty for original task attempt, and
- // non-empty when current task attempt is a speculative one, in which case scheduler
- // should try to schedule the speculative attempt on to a node other than the one(s)
- // recorded in this set.
- private Set<NodeId> nodesWithSiblingRunningAttempts;
private long creationTime;
private long scheduledTime;
@@ -543,10 +539,10 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex,
- Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec) {
+ Task task, TaskLocationHint locationHint, TaskSpec taskSpec) {
this(attemptId, eventHandler, taskCommunicatorManagerInterface, conf, clock,
taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
- vertex, locationHint, taskSpec, null, null);
+ task, locationHint, taskSpec, null);
}
@SuppressWarnings("rawtypes")
@@ -555,8 +551,8 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex,
- Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec,
- TezTaskAttemptID schedulingCausalTA, Set<NodeId> nodesWithSiblingRunningAttempts) {
+ Task task, TaskLocationHint locationHint, TaskSpec taskSpec,
+ TezTaskAttemptID schedulingCausalTA) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
@@ -568,11 +564,11 @@ public class TaskAttemptImpl implements TaskAttempt,
this.clock = clock;
this.taskHeartbeatHandler = taskHeartbeatHandler;
this.appContext = appContext;
- this.vertex = vertex;
+ this.vertex = task.getVertex();
+ this.task = task;
this.locationHint = locationHint;
this.taskSpec = taskSpec;
this.creationCausalTA = schedulingCausalTA;
- this.nodesWithSiblingRunningAttempts = nodesWithSiblingRunningAttempts;
this.creationTime = clock.getTime();
this.reportedStatus = new TaskAttemptStatus(this.attemptId);
@@ -616,11 +612,6 @@ public class TaskAttemptImpl implements TaskAttempt,
}
@Override
- public Set<NodeId> getNodesWithSiblingRunningAttempts() {
- return nodesWithSiblingRunningAttempts;
- }
-
- @Override
public TaskAttemptReport getReport() {
TaskAttemptReport result = Records.newRecord(TaskAttemptReport.class);
readLock.lock();
@@ -866,7 +857,12 @@ public class TaskAttemptImpl implements TaskAttempt,
readLock.unlock();
}
}
-
+
+ @Override
+ public Task getTask() {
+ return task;
+ }
+
Vertex getVertex() {
return vertex;
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 9e3c5a8..9289d8f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -152,6 +152,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// track the status of TaskAttempt (true mean completed, false mean uncompleted)
private final Map<Integer, Boolean> taskAttemptStatus = new HashMap<Integer,Boolean>();
+ // The set of nodes with active running attempts at the time of the latest attempt for
+ // this task was scheduled. This set is empty when scheduling original task attempt, and
+ // non-empty scheduling a speculative attempt, in which case scheduler should avoid
+ // scheduling the speculative attempt onto node(s) recorded in this set.
private final Set<NodeId> nodesWithRunningAttempts = Collections
.newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
@@ -584,6 +588,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
+ @Override
+ public Set<NodeId> getNodesWithRunningAttempts() {
+ return nodesWithRunningAttempts;
+ }
+
@VisibleForTesting
public TaskStateInternal getInternalState() {
readLock.lock();
@@ -749,8 +758,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
baseTaskSpec.getTaskConf());
return new TaskAttemptImpl(attemptId, eventHandler,
taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext,
- (failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(),
- locationHint, taskSpec, schedulingCausalTA, nodesWithRunningAttempts);
+ (failedAttempts > 0), taskResource, containerContext, leafVertex, this,
+ locationHint, taskSpec, schedulingCausalTA);
}
@Override
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java
index 3191c48..6a78425 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java
@@ -1132,7 +1132,7 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
private boolean maybeChangeNode(TaskRequest request, NodeId nodeId) {
Object task = request.getTask();
if (task instanceof TaskAttempt) {
- Set<NodeId> nodesWithSiblingRunningAttempts = ((TaskAttempt) task).getNodesWithSiblingRunningAttempts();
+ Set<NodeId> nodesWithSiblingRunningAttempts = ((TaskAttempt) task).getTask().getNodesWithRunningAttempts();
if (nodesWithSiblingRunningAttempts != null
&& nodesWithSiblingRunningAttempts.contains(nodeId)) {
return true;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index f128ec9..a327967 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -1536,7 +1536,8 @@ public class YarnTaskSchedulerService extends TaskScheduler
HeldContainer heldContainer = heldContainers.get(container.getId());
Object task = getTask(cookieContainerRequest);
if (task instanceof TaskAttempt
- && ((TaskAttempt) task).getNodesWithSiblingRunningAttempts().contains(container.getNodeId())) {
+ && ((TaskAttempt) task).getTask() != null
+ && ((TaskAttempt) task).getTask().getNodesWithRunningAttempts().contains(container.getNodeId())) {
return false;
}
if (heldContainer == null || heldContainer.isNew()) { // New container.
@@ -1790,9 +1791,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
Container container = entry.getValue();
// check for blacklisted nodes. There may be race conditions between
// setting blacklist and receiving allocations
- CookieContainerRequest request = entry.getKey();
- Object task = getTask(request);
if (blacklistedNodes.contains(container.getNodeId())) {
+ CookieContainerRequest request = entry.getKey();
+ Object task = getTask(request);
LOG.info("Container: " + container.getId() +
" allocated on blacklisted node: " + container.getNodeId() +
" for task: " + task);
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index ce3e7e5..41cce3b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -83,6 +83,7 @@ import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskCommunicatorWrapper;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
@@ -145,6 +146,7 @@ public class TestTaskAttempt {
TezConfiguration vertexConf = new TezConfiguration();
TaskLocationHint locationHint;
Vertex mockVertex;
+ Task mockTask;
ServicePluginInfo servicePluginInfo = new ServicePluginInfo()
.setContainerLauncherName(TezConstants.getTezYarnServicePluginName());
@@ -161,6 +163,8 @@ public class TestTaskAttempt {
TezConstants.getTezYarnServicePluginName());
createMockVertex(vertexConf);
+ mockTask = mock(Task.class);
+ when(mockTask.getVertex()).thenReturn(mockVertex);
HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
@@ -2193,8 +2197,8 @@ public class TestTaskAttempt {
super(TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber),
eventHandler, tal, conf,
clock, taskHeartbeatHandler, appContext,
- isRescheduled, resource, containerContext, leafVertex, mockVertex,
- locationHint, null, null, null);
+ isRescheduled, resource, containerContext, leafVertex, mockTask,
+ locationHint, null, null);
}
boolean inputFailedReported = false;
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 51a4bdf..81cd675 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -1365,8 +1365,8 @@ public class TestTaskImpl {
boolean isRescheduled,
Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
super(attemptId, eventHandler, tal, conf, clock, thh,
- appContext, isRescheduled, resource, containerContext, false, null,
- locationHint, mockTaskSpec, schedCausalTA, null);
+ appContext, isRescheduled, resource, containerContext, false, mockTask,
+ locationHint, mockTaskSpec, schedCausalTA);
}
@Override
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
index c979a7a..ad0cf07 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.MockClock;
+import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.rm.DagAwareYarnTaskScheduler.AMRMClientAsyncWrapper;
import org.apache.tez.dag.app.rm.DagAwareYarnTaskScheduler.HeldContainer;
@@ -350,7 +351,9 @@ public class TestDagAwareYarnTaskScheduler {
NodeId speculativeNodeId = mock(NodeId.class);
when(speculativeNodeId.getHost()).thenReturn(speculativeNode);
TaskAttempt mockTask5 = mock(TaskAttempt.class);
- when(mockTask5.getNodesWithSiblingRunningAttempts()).thenReturn(Sets.newHashSet(speculativeNodeId));
+ Task task = mock(Task.class);
+ when(mockTask5.getTask()).thenReturn(task);
+ when(task.getNodesWithRunningAttempts()).thenReturn(Sets.newHashSet(speculativeNodeId));
Object mockCookie5 = new Object();
scheduler.allocateTask(mockTask5, mockCapability, hosts, racks,
mockPriority, null, mockCookie5);
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 21f4c52..965b8d7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest;
@@ -352,7 +353,9 @@ public class TestTaskScheduler {
NodeId speculativeNodeId = mock(NodeId.class);
when(speculativeNodeId.getHost()).thenReturn(speculativeNode);
TaskAttempt mockTask5 = mock(TaskAttempt.class);
- when(mockTask5.getNodesWithSiblingRunningAttempts()).thenReturn(Sets.newHashSet(speculativeNodeId));
+ Task task = mock(Task.class);
+ when(mockTask5.getTask()).thenReturn(task);
+ when(task.getNodesWithRunningAttempts()).thenReturn(Sets.newHashSet(speculativeNodeId));
Object mockCookie5 = new Object();
scheduler.allocateTask(mockTask5, mockCapability, hosts, racks,
mockPriority, null, mockCookie5);