You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2019/02/26 04:39:00 UTC
[tez] branch master updated: TEZ-4042. Speculative attempts should
avoid running on the same node
This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 0093f8b TEZ-4042. Speculative attempts should avoid running on the same node
0093f8b is described below
commit 0093f8b4d9ea38fcdfeb802fe3206c4b2ae8f077
Author: Ying Han <hy...@outlook.com>
AuthorDate: Mon Feb 25 22:38:01 2019 -0600
TEZ-4042. Speculative attempts should avoid running on the same node
---
.../org/apache/tez/dag/app/dag/TaskAttempt.java | 5 ++++
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 16 +++++++++--
.../org/apache/tez/dag/app/dag/impl/TaskImpl.java | 15 +++++++---
.../tez/dag/app/rm/DagAwareYarnTaskScheduler.java | 32 ++++++++++++++++++----
.../tez/dag/app/rm/YarnTaskSchedulerService.java | 12 ++++++--
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 2 +-
.../apache/tez/dag/app/dag/impl/TestTaskImpl.java | 7 ++++-
.../dag/app/rm/TestDagAwareYarnTaskScheduler.java | 19 +++++++++++++
.../apache/tez/dag/app/rm/TestTaskScheduler.java | 18 ++++++++++++
9 files changed, 110 insertions(+), 16 deletions(-)
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index ba09bd9..d0fec5c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app.dag;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.yarn.api.records.Container;
@@ -136,4 +137,8 @@ public interface TaskAttempt {
*/
long getFinishTime();
+ /**
+ * @return the set of nodes on which sibling attempts were running on.
+ */
+ Set<NodeId> getNodesWithSiblingRunningAttempts();
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 3107330..4608052 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -228,6 +228,12 @@ public class TaskAttemptImpl implements TaskAttempt,
private final boolean leafVertex;
private TezTaskAttemptID creationCausalTA;
+ // Record the set of nodes on which sibling attempts were running on, at the time of
+ // this attempt being scheduled. This set is empty for original task attempt, and
+ // non-empty when current task attempt is a speculative one, in which case scheduler
+ // should try to schedule the speculative attempt on to a node other than the one(s)
+ // recorded in this set.
+ private Set<NodeId> nodesWithSiblingRunningAttempts;
private long creationTime;
private long scheduledTime;
@@ -540,7 +546,7 @@ public class TaskAttemptImpl implements TaskAttempt,
Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec) {
this(attemptId, eventHandler, taskCommunicatorManagerInterface, conf, clock,
taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
- vertex, locationHint, taskSpec, null);
+ vertex, locationHint, taskSpec, null, null);
}
@SuppressWarnings("rawtypes")
@@ -550,7 +556,7 @@ public class TaskAttemptImpl implements TaskAttempt,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex,
Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec,
- TezTaskAttemptID schedulingCausalTA) {
+ TezTaskAttemptID schedulingCausalTA, Set<NodeId> nodesWithSiblingRunningAttempts) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
@@ -566,6 +572,7 @@ public class TaskAttemptImpl implements TaskAttempt,
this.locationHint = locationHint;
this.taskSpec = taskSpec;
this.creationCausalTA = schedulingCausalTA;
+ this.nodesWithSiblingRunningAttempts = nodesWithSiblingRunningAttempts;
this.creationTime = clock.getTime();
this.reportedStatus = new TaskAttemptStatus(this.attemptId);
@@ -609,6 +616,11 @@ public class TaskAttemptImpl implements TaskAttempt,
}
@Override
+ public Set<NodeId> getNodesWithSiblingRunningAttempts() {
+ return nodesWithSiblingRunningAttempts;
+ }
+
+ @Override
public TaskAttemptReport getReport() {
TaskAttemptReport result = Records.newRecord(TaskAttemptReport.class);
readLock.lock();
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 9e1d85f..9e3c5a8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -26,6 +26,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -37,6 +39,7 @@ import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
import org.apache.tez.runtime.api.TaskFailureType;
import org.slf4j.Logger;
@@ -149,6 +152,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// track the status of TaskAttempt (true mean completed, false mean uncompleted)
private final Map<Integer, Boolean> taskAttemptStatus = new HashMap<Integer,Boolean>();
+ private final Set<NodeId> nodesWithRunningAttempts = Collections
+ .newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
+
private static final SingleArcTransition<TaskImpl , TaskEvent>
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
private static final SingleArcTransition<TaskImpl, TaskEvent>
@@ -744,7 +750,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
return new TaskAttemptImpl(attemptId, eventHandler,
taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext,
(failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(),
- locationHint, taskSpec, schedulingCausalTA);
+ locationHint, taskSpec, schedulingCausalTA, nodesWithRunningAttempts);
}
@Override
@@ -1009,14 +1015,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
public void transition(TaskImpl task, TaskEvent event) {
LOG.info("Scheduling a redundant attempt for task " + task.taskId);
task.counters.findCounter(TaskCounter.NUM_SPECULATIONS).increment(1);
- TezTaskAttemptID earliestUnfinishedAttempt = null;
+ TaskAttempt earliestUnfinishedAttempt = null;
for (TaskAttempt ta : task.attempts.values()) {
// find the oldest running attempt
if (!ta.isFinished()) {
- earliestUnfinishedAttempt = ta.getID();
+ earliestUnfinishedAttempt = ta;
+ task.nodesWithRunningAttempts.add(ta.getNodeId());
}
}
- task.addAndScheduleAttempt(earliestUnfinishedAttempt);
+ task.addAndScheduleAttempt(earliestUnfinishedAttempt.getID());
}
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java
index 1cdc217..3191c48 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskScheduler;
@@ -398,6 +399,9 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
for (Collection<TaskRequest> requests : results) {
if (!requests.isEmpty()) {
TaskRequest request = requests.iterator().next();
+ if (maybeChangeNode(request, hc.getContainer().getNodeId())) {
+ continue;
+ }
assignContainer(request, hc, location);
assignments.add(new Assignment(request, hc.getContainer()));
return;
@@ -515,6 +519,9 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
if (requestTracker.isRequestBlocked(request)) {
LOG.debug("Cannot assign task {} to container {} since vertex {} is a descendant of pending tasks",
request.getTask(), hc.getId(), request.getVertexIndex());
+ } else if (maybeChangeNode(request, hc.getContainer().getNodeId())) {
+ LOG.debug("Cannot assign task {} to container {} since node {} is running sibling attempts",
+ request.getTask(), hc.getId(), request.getVertexIndex());
} else {
assignContainer(request, hc, hc.getId());
return request;
@@ -543,8 +550,10 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
Object signature = hc.getSignature();
if (signature == null || signatureMatcher.isSuperSet(signature, request.getContainerSignature())) {
- assignContainer(request, hc, matchLocation);
- return request;
+ if (!maybeChangeNode(request, hc.getContainer().getNodeId())) {
+ assignContainer(request, hc, matchLocation);
+ return request;
+ }
}
}
}
@@ -1050,7 +1059,7 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
ContainerId affinity = request.getAffinity();
if (affinity != null) {
HeldContainer hc = heldContainers.get(affinity);
- if (hc != null && hc.isAssignable()) {
+ if (hc != null && hc.isAssignable() && !maybeChangeNode(request, hc.getContainer().getNodeId())) {
assignContainer(request, hc, affinity);
return hc;
}
@@ -1099,12 +1108,13 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
if (eligibleStates.contains(hc.getState())) {
Object csig = hc.getSignature();
if (csig == null || signatureMatcher.isSuperSet(csig, request.getContainerSignature())) {
+ boolean needToChangeNode = maybeChangeNode(request, hc.getContainer().getNodeId());
int numAffinities = hc.getNumAffinities();
- if (numAffinities == 0) {
+ if (numAffinities == 0 && !needToChangeNode) {
bestMatch = hc;
break;
}
- if (bestMatch == null || numAffinities < bestMatch.getNumAffinities()) {
+ if ((bestMatch == null || numAffinities < bestMatch.getNumAffinities()) && !needToChangeNode) {
bestMatch = hc;
}
} else {
@@ -1119,6 +1129,18 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
return bestMatch;
}
+ private boolean maybeChangeNode(TaskRequest request, NodeId nodeId) {
+ Object task = request.getTask();
+ if (task instanceof TaskAttempt) {
+ Set<NodeId> nodesWithSiblingRunningAttempts = ((TaskAttempt) task).getNodesWithSiblingRunningAttempts();
+ if (nodesWithSiblingRunningAttempts != null
+ && nodesWithSiblingRunningAttempts.contains(nodeId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public void setShouldUnregister() {
shouldUnregister = true;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 95cd85b..f128ec9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.common.ContainerSignatureMatcher;
@@ -118,7 +119,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
new HashMap<ContainerId, HeldContainer>();
Set<Priority> priorityHasAffinity = Sets.newHashSet();
-
+
Set<NodeId> blacklistedNodes = Collections
.newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
@@ -1533,6 +1534,11 @@ public class YarnTaskSchedulerService extends TaskScheduler
private boolean canAssignTaskToContainer(
CookieContainerRequest cookieContainerRequest, Container container) {
HeldContainer heldContainer = heldContainers.get(container.getId());
+ Object task = getTask(cookieContainerRequest);
+ if (task instanceof TaskAttempt
+ && ((TaskAttempt) task).getNodesWithSiblingRunningAttempts().contains(container.getNodeId())) {
+ return false;
+ }
if (heldContainer == null || heldContainer.isNew()) { // New container.
return true;
} else {
@@ -1784,9 +1790,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
Container container = entry.getValue();
// check for blacklisted nodes. There may be race conditions between
// setting blacklist and receiving allocations
+ CookieContainerRequest request = entry.getKey();
+ Object task = getTask(request);
if (blacklistedNodes.contains(container.getNodeId())) {
- CookieContainerRequest request = entry.getKey();
- Object task = getTask(request);
LOG.info("Container: " + container.getId() +
" allocated on blacklisted node: " + container.getNodeId() +
" for task: " + task);
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 5038810..ce3e7e5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -2194,7 +2194,7 @@ public class TestTaskAttempt {
eventHandler, tal, conf,
clock, taskHeartbeatHandler, appContext,
isRescheduled, resource, containerContext, leafVertex, mockVertex,
- locationHint, null, null);
+ locationHint, null, null, null);
}
boolean inputFailedReported = false;
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index b142bb9..51a4bdf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -1366,7 +1366,7 @@ public class TestTaskImpl {
Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
super(attemptId, eventHandler, tal, conf, clock, thh,
appContext, isRescheduled, resource, containerContext, false, null,
- locationHint, mockTaskSpec, schedCausalTA);
+ locationHint, mockTaskSpec, schedCausalTA, null);
}
@Override
@@ -1401,6 +1401,11 @@ public class TestTaskImpl {
public ContainerId getAssignedContainerID() {
return mockContainerId;
}
+
+ @Override
+ public NodeId getNodeId() {
+ return mockNodeId;
+ }
}
public class ServiceBusyEvent extends TezAbstractEvent<TaskAttemptEventType>
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
index 911f4b1..c979a7a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.app.rm;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.MockClock;
+import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.rm.DagAwareYarnTaskScheduler.AMRMClientAsyncWrapper;
import org.apache.tez.dag.app.rm.DagAwareYarnTaskScheduler.HeldContainer;
import org.apache.tez.dag.app.rm.DagAwareYarnTaskScheduler.TaskRequest;
@@ -343,6 +345,23 @@ public class TestDagAwareYarnTaskScheduler {
verify(mockRMClient, times(8)).addContainerRequest(any(TaskRequest.class));
assertFalse(scheduler.deallocateTask(mockTask1, true, null, null));
+ // test speculative node adjustment
+ String speculativeNode = "host8";
+ NodeId speculativeNodeId = mock(NodeId.class);
+ when(speculativeNodeId.getHost()).thenReturn(speculativeNode);
+ TaskAttempt mockTask5 = mock(TaskAttempt.class);
+ when(mockTask5.getNodesWithSiblingRunningAttempts()).thenReturn(Sets.newHashSet(speculativeNodeId));
+ Object mockCookie5 = new Object();
+ scheduler.allocateTask(mockTask5, mockCapability, hosts, racks,
+ mockPriority, null, mockCookie5);
+ drainableAppCallback.drain();
+ // no new allocation
+ verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
+ // verify container released
+ verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
+ // verify request added back
+ verify(mockRMClient, times(9)).addContainerRequest(requestCaptor.capture());
+
List<NodeReport> mockUpdatedNodes = mock(List.class);
scheduler.onNodesUpdated(mockUpdatedNodes);
drainableAppCallback.drain();
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 49f8fe3..21f4c52 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
@@ -346,6 +347,23 @@ public class TestTaskScheduler {
(CookieContainerRequest) any());
assertFalse(scheduler.deallocateTask(mockTask1, true, null, null));
+ // test speculative node adjustment
+ String speculativeNode = "host8";
+ NodeId speculativeNodeId = mock(NodeId.class);
+ when(speculativeNodeId.getHost()).thenReturn(speculativeNode);
+ TaskAttempt mockTask5 = mock(TaskAttempt.class);
+ when(mockTask5.getNodesWithSiblingRunningAttempts()).thenReturn(Sets.newHashSet(speculativeNodeId));
+ Object mockCookie5 = new Object();
+ scheduler.allocateTask(mockTask5, mockCapability, hosts, racks,
+ mockPriority, null, mockCookie5);
+ drainableAppCallback.drain();
+ // no new allocation
+ verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
+ // verify container released
+ verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
+ // verify request added back
+ verify(mockRMClient, times(9)).addContainerRequest(requestCaptor.capture());
+
List<NodeReport> mockUpdatedNodes = mock(List.class);
scheduler.onNodesUpdated(mockUpdatedNodes);
drainableAppCallback.drain();