You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/15 03:52:50 UTC
git commit: TEZ-804. Handle node loss/bad nodes (bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master 720adc32d -> 206a1ed69
TEZ-804. Handle node loss/bad nodes (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/206a1ed6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/206a1ed6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/206a1ed6
Branch: refs/heads/master
Commit: 206a1ed69e386fa561387a09cdbcce40db14dbb1
Parents: 720adc3
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Feb 14 18:52:37 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Feb 14 18:52:37 2014 -0800
----------------------------------------------------------------------
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 19 +-
.../app/rm/AMSchedulerEventTALaunchRequest.java | 2 +-
.../apache/tez/dag/app/rm/TaskScheduler.java | 36 +++-
.../dag/app/rm/TaskSchedulerEventHandler.java | 20 +-
.../tez/dag/app/rm/TezAMRMClientAsync.java | 6 +
.../dag/app/rm/container/AMContainerImpl.java | 4 +
.../org/apache/tez/dag/app/rm/node/AMNode.java | 1 +
.../apache/tez/dag/app/rm/node/AMNodeImpl.java | 41 +++-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 36 +++-
.../tez/dag/app/rm/TestTaskScheduler.java | 190 ++++++++++++++++++-
.../tez/dag/app/rm/node/TestAMNodeMap.java | 123 ++++++++++--
.../org/apache/tez/mapreduce/TestMRRJobs.java | 2 +-
.../org/apache/tez/test/TestFaultTolerance.java | 1 +
13 files changed, 445 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 81f5c5a..368406a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -70,6 +71,8 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
@@ -1099,8 +1102,22 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
+ TezTaskAttemptID failedAttemptId = castEvent.getTaskAttemptID();
+ TaskAttempt failedAttempt = task.getAttempt(failedAttemptId);
+ ContainerId containerId = failedAttempt.getAssignedContainerID();
+ if (containerId != null) {
+ AMContainer amContainer = task.appContext.getAllContainers().
+ get(containerId);
+ if (amContainer != null) {
+ // inform the node about failure
+ task.eventHandler.handle(
+ new AMNodeEventTaskAttemptEnded(amContainer.getContainer().getNodeId(),
+ containerId, failedAttemptId, true));
+ }
+ }
+
if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
- !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
+ !failedAttemptId.equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
// succeeded state
return TaskStateInternal.SUCCEEDED;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
index bacb83e..14f303d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
@@ -26,7 +26,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
- // TODO Get rid of remoteTask from here. Can be forgottent after it has been assigned.
+ // TODO Get rid of remoteTask from here. Can be forgotten after it has been assigned.
//.... Maybe have the Container talk to the TaskAttempt to pull in the remote task.
private final TezTaskAttemptID attemptId;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index c6bcfcb..52173f6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
@@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -64,6 +66,7 @@ import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/* TODO not yet updating cluster nodes on every allocate response
@@ -144,6 +147,8 @@ public class TaskScheduler extends AbstractService
Map<ContainerId, HeldContainer> heldContainers =
new HashMap<ContainerId, HeldContainer>();
+ Set<NodeId> blacklistedNodes = Sets.newConcurrentHashSet();
+
Resource totalResources = Resource.newInstance(0, 0);
Resource allocatedResources = Resource.newInstance(0, 0);
@@ -777,6 +782,11 @@ public class TaskScheduler extends AbstractService
return totalResources;
}
+ public synchronized void blacklistNode(NodeId nodeId) {
+ amRmClient.addNodeToBlacklist(nodeId);
+ blacklistedNodes.add(nodeId);
+ }
+
public synchronized void allocateTask(
Object task,
Resource capability,
@@ -1238,7 +1248,31 @@ public class TaskScheduler extends AbstractService
}
for (Entry<CookieContainerRequest, Container> entry : assignedContainers
.entrySet()) {
- informAppAboutAssignment(entry.getKey(), entry.getValue());
+ Container container = entry.getValue();
+ // check for blacklisted nodes. There may be race conditions between
+ // setting blacklist and receiving allocations
+ if (blacklistedNodes.contains(container.getNodeId())) {
+ CookieContainerRequest request = entry.getKey();
+ Object task = getTask(request);
+ Object clientCookie = request.getCookie().getAppCookie();
+ LOG.info("Container: " + container.getId() +
+ " allocated on blacklisted node: " + container.getNodeId() +
+ " for task: " + task);
+ Object deAllocTask = deallocateContainer(container.getId());
+ assert deAllocTask.equals(task);
+ // its ok to submit the same request again because the RM will not give us
+ // the bad/unhealthy nodes again. The nodes may become healthy/unblacklisted
+ // and so its better to give the RM the full information.
+ allocateTask(task, request.getCapability(),
+ (request.getNodes() == null ? null :
+ request.getNodes().toArray(new String[request.getNodes().size()])),
+ (request.getRacks() == null ? null :
+ request.getRacks().toArray(new String[request.getRacks().size()])),
+ request.getPriority(),
+ request.getCookie().getContainerSignature(), clientCookie);
+ } else {
+ informAppAboutAssignment(entry.getKey(), container);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 5473e77..550b3a2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -49,6 +49,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
@@ -135,7 +136,9 @@ public class TaskSchedulerEventHandler extends AbstractService
case S_CONTAINERS_ALLOCATED:
break;
case S_CONTAINER_COMPLETED:
+ break;
case S_NODE_BLACKLISTED:
+ handleNodeBlacklist((AMSchedulerEventNodeBlacklisted)sEvent);
break;
case S_NODE_UNHEALTHY:
break;
@@ -168,6 +171,9 @@ public class TaskSchedulerEventHandler extends AbstractService
eventHandler.handle(event);
}
+ private void handleNodeBlacklist(AMSchedulerEventNodeBlacklisted event) {
+ taskScheduler.blacklistNode(event.getNodeId());
+ }
private void handleContainerDeallocate(
AMSchedulerEventDeallocateContainer event) {
@@ -325,8 +331,6 @@ public class TaskSchedulerEventHandler extends AbstractService
@Override
public synchronized void serviceStart() {
- // FIXME hack alert how is this supposed to support multiple DAGs?
- // Answer: this is shared across dags. need job==app-dag-master
InetSocketAddress serviceAddr = clientService.getBindAddress();
dagAppMaster = appContext.getAppMaster();
taskScheduler = createTaskScheduler(serviceAddr.getHostName(),
@@ -395,7 +399,7 @@ public class TaskSchedulerEventHandler extends AbstractService
// because the deallocateTask downcall may have raced with the
// taskAllocated() upcall
assert task.equals(taskAttempt);
-
+
if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
event.getContainerContext()));
@@ -409,12 +413,18 @@ public class TaskSchedulerEventHandler extends AbstractService
@Override
public synchronized void containerCompleted(Object task, ContainerStatus containerStatus) {
// Inform the Containers about completion.
- sendEvent(new AMContainerEventCompleted(containerStatus));
+ AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
+ if (amContainer != null) {
+ sendEvent(new AMContainerEventCompleted(containerStatus));
+ }
}
@Override
public synchronized void containerBeingReleased(ContainerId containerId) {
- sendEvent(new AMContainerEventStopRequest(containerId));
+ AMContainer amContainer = appContext.getAllContainers().get(containerId);
+ if (amContainer != null) {
+ sendEvent(new AMContainerEventStopRequest(containerId));
+ }
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
index 487c57a..e90ed0f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -80,6 +81,11 @@ public class TezAMRMClientAsync<T extends ContainerRequest> extends AMRMClientAs
}
return iter.next();
}
+
+ // Remove after YARN-1723 is fixed
+ public synchronized void addNodeToBlacklist(NodeId nodeId) {
+ client.updateBlacklist(Collections.singletonList(nodeId.getHost()), null);
+ }
@Override
public synchronized void addContainerRequest(T req) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 1ce0d89..d782be2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -717,6 +717,10 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ if (container.nodeFailed) {
+ // ignore duplicates
+ return;
+ }
container.nodeFailed = true;
String errorMessage = null;
if (cEvent instanceof DiagnosableEvent) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
index 37f3f1b..1c34816 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
@@ -32,4 +32,5 @@ public interface AMNode extends EventHandler<AMNodeEvent> {
public boolean isUnhealthy();
public boolean isBlacklisted();
+ public boolean isUsable();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 9ee8a3c..256eb1f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.rm.node;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -41,6 +42,10 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklisted;
import org.apache.tez.dag.app.rm.container.AMContainerEvent;
import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
public class AMNodeImpl implements AMNode {
@@ -51,16 +56,18 @@ public class AMNodeImpl implements AMNode {
private final NodeId nodeId;
private final AppContext appContext;
private final int maxTaskFailuresPerNode;
- private int numFailedTAs = 0;
- private int numSuccessfulTAs = 0;
private boolean blacklistingEnabled;
private boolean ignoreBlacklisting = false;
+ private Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet();
@SuppressWarnings("rawtypes")
protected EventHandler eventHandler;
- private final List<ContainerId> containers = new LinkedList<ContainerId>();
-
+ @VisibleForTesting
+ final List<ContainerId> containers = new LinkedList<ContainerId>();
+ int numFailedTAs = 0;
+ int numSuccessfulTAs = 0;
+
//Book-keeping only. In case of Health status change.
private final List<ContainerId> pastContainers = new LinkedList<ContainerId>();
@@ -174,6 +181,7 @@ public class AMNodeImpl implements AMNode {
this.nodeId = nodeId;
this.appContext = appContext;
this.eventHandler = eventHandler;
+ this.blacklistingEnabled = blacklistingEnabled;
this.maxTaskFailuresPerNode = maxTaskFailuresPerNode;
this.stateMachine = stateMachineFactory.make(this);
// TODO Handle the case where a node is created due to the RM reporting it's
@@ -237,6 +245,9 @@ public class AMNodeImpl implements AMNode {
}
protected void blacklistSelf() {
+ for (ContainerId c : containers) {
+ sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+ }
sendEvent(new AMNodeEvent(getNodeId(),
AMNodeEventType.N_NODE_WAS_BLACKLISTED));
sendEvent(new AMSchedulerEventNodeBlacklisted(getNodeId()));
@@ -274,11 +285,17 @@ public class AMNodeImpl implements AMNode {
public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
AMNodeEventTaskAttemptEnded event = (AMNodeEventTaskAttemptEnded) nEvent;
if (event.failed()) {
- node.numFailedTAs++;
- boolean shouldBlacklist = node.shouldBlacklistNode();
- if (shouldBlacklist) {
- node.blacklistSelf();
- return AMNodeState.BLACKLISTED;
+ // ignore duplicate attempt ids
+ if (node.failedAttemptIds.add(event.getTaskAttemptId())) {
+ // new failed container on node
+ node.numFailedTAs++;
+ boolean shouldBlacklist = node.shouldBlacklistNode();
+ if (shouldBlacklist) {
+ LOG.info("Too many task attempt failures. " +
+ "Blacklisting node: " + node.getNodeId());
+ node.blacklistSelf();
+ return AMNodeState.BLACKLISTED;
+ }
}
}
return AMNodeState.ACTIVE;
@@ -409,6 +426,7 @@ public class AMNodeImpl implements AMNode {
}
}
+ @Override
public boolean isBlacklisted() {
this.readLock.lock();
try {
@@ -417,4 +435,9 @@ public class AMNodeImpl implements AMNode {
this.readLock.unlock();
}
}
+
+ @Override
+ public boolean isUsable() {
+ return !(isUnhealthy() || isBlacklisted());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 451ba07..b18e2f6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -22,7 +22,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashMap;
@@ -35,7 +37,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -57,6 +62,10 @@ import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.node.AMNodeEvent;
+import org.apache.tez.dag.app.rm.node.AMNodeEventType;
+import org.apache.tez.dag.app.rm.node.AMNodeMap;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -92,6 +101,10 @@ public class TestTaskImpl {
private String javaOpts;
private boolean leafVertex;
private ContainerContext containerContext;
+ private ContainerId mockContainerId;
+ private Container mockContainer;
+ private AMContainer mockAMContainer;
+ private NodeId mockNodeId;
private MockTaskImpl mockTask;
@@ -117,7 +130,15 @@ public class TestTaskImpl {
appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
dagId = TezDAGID.getInstance(appId, 1);
vertexId = TezVertexID.getInstance(dagId, 1);
- appContext = mock(AppContext.class);
+ appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+ mockContainerId = mock(ContainerId.class);
+ mockContainer = mock(Container.class);
+ mockAMContainer = mock(AMContainer.class);
+ mockNodeId = mock(NodeId.class);
+ when(mockContainer.getId()).thenReturn(mockContainerId);
+ when(mockContainer.getNodeId()).thenReturn(mockNodeId);
+ when(mockAMContainer.getContainer()).thenReturn(mockContainer);
+ when(appContext.getAllContainers().get(mockContainerId)).thenReturn(mockAMContainer);
taskResource = Resource.newInstance(1024, 1);
localResources = new HashMap<String, LocalResource>();
environment = new HashMap<String, String>();
@@ -127,6 +148,7 @@ public class TestTaskImpl {
environment, javaOpts);
Vertex vertex = mock(Vertex.class);
eventHandler = new TestEventHandler();
+
mockTask = new MockTaskImpl(vertexId, partition,
eventHandler, conf, taskAttemptListener, clock,
taskHeartbeatHandler, appContext, leafVertex, locationHint,
@@ -477,13 +499,16 @@ public class TestTaskImpl {
// The task should now have succeeded
assertTaskSucceededState();
+ eventHandler.events.clear();
// Now fail the attempt after it has succeeded
mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt()
.getID(), TaskEventType.T_ATTEMPT_FAILED));
// The task should still be in the scheduled state
assertTaskScheduledState();
- Event event = eventHandler.events.get(eventHandler.events.size()-1);
+ Event event = eventHandler.events.get(0);
+ Assert.assertEquals(AMNodeEventType.N_TA_ENDED, event.getType());
+ event = eventHandler.events.get(eventHandler.events.size()-1);
Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
}
@@ -494,7 +519,7 @@ public class TestTaskImpl {
private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
private Vertex vertex;
-
+
public MockTaskImpl(TezVertexID vertexId, int partition,
EventHandler eventHandler, Configuration conf,
TaskAttemptListener taskAttemptListener, Clock clock,
@@ -583,6 +608,11 @@ public class TestTaskImpl {
public TaskAttemptState getStateNoLock() {
return state;
}
+
+ @Override
+ public ContainerId getAssignedContainerID() {
+ return mockContainerId;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index c8e8d1b..a01412c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -86,7 +86,7 @@ public class TestTaskScheduler {
RecordFactoryProvider.getRecordFactory(null);
@SuppressWarnings({ "unchecked" })
- @Test
+ @Test(timeout=10000)
public void testTaskSchedulerNoReuse() throws Exception {
RackResolver.init(new YarnConfiguration());
TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
@@ -346,8 +346,98 @@ public class TestTaskScheduler {
// no other statuses returned
verify(mockApp, times(3)).containerCompleted(any(), (ContainerStatus) any());
verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
+
+ // verify blacklisting
+ verify(mockRMClient, times(0)).addNodeToBlacklist((NodeId)any());
+ String badHost = "host6";
+ NodeId badNodeId = mock(NodeId.class);
+ when(badNodeId.getHost()).thenReturn(badHost);
+ scheduler.blacklistNode(badNodeId);
+ verify(mockRMClient, times(1)).addNodeToBlacklist(badNodeId);
+ Object mockTask4 = mock(Object.class);
+ Object mockCookie4 = mock(Object.class);
+ scheduler.allocateTask(mockTask4, mockCapability, null,
+ null, mockPriority, null, mockCookie4);
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(5)).addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request4 = requestCaptor.getValue();
+ anyContainers.clear();
+ anyContainers.add(request4);
+ Container mockContainer5 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer5.getNodeId().getHost()).thenReturn(badHost);
+ when(mockContainer5.getNodeId()).thenReturn(badNodeId);
+ ContainerId mockCId5 = mock(ContainerId.class);
+ when(mockContainer5.getId()).thenReturn(mockCId5);
+ containers.clear();
+ containers.add(mockContainer5);
+ when(
+ mockRMClient.getMatchingRequests((Priority) any(),
+ eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return anyList;
+ }
+ }).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return emptyList;
+ }
+ });
+ scheduler.onContainersAllocated(containers);
+ drainableAppCallback.drain();
+ // no new allocation
+ verify(mockApp, times(3)).taskAllocated(any(), any(), (Container) any());
+ // verify blacklisted container released
+ verify(mockRMClient).releaseAssignedContainer(mockCId5);
+ verify(mockRMClient, times(4)).releaseAssignedContainer((ContainerId) any());
+ // verify request added back
+ verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request5 = requestCaptor.getValue();
+ anyContainers.clear();
+ anyContainers.add(request5);
+ Container mockContainer6 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer6.getNodeId().getHost()).thenReturn("host7");
+ ContainerId mockCId6 = mock(ContainerId.class);
+ when(mockContainer6.getId()).thenReturn(mockCId6);
+ containers.clear();
+ containers.add(mockContainer6);
+ when(
+ mockRMClient.getMatchingRequests((Priority) any(),
+ eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return anyList;
+ }
+
+ }).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return emptyList;
+ }
+
+ });
+ scheduler.onContainersAllocated(containers);
+ drainableAppCallback.drain();
+ // new allocation
+ verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
+ verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6);
+ // deallocate allocated task
+ assertTrue(scheduler.deallocateTask(mockTask4, true));
+ drainableAppCallback.drain();
+ verify(mockApp).containerBeingReleased(mockCId6);
+ verify(mockRMClient).releaseAssignedContainer(mockCId6);
+ verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
+
float progress = 0.5f;
when(mockApp.getProgress()).thenReturn(progress);
Assert.assertEquals(progress, scheduler.getProgress(), 0);
@@ -433,6 +523,7 @@ public class TestTaskScheduler {
final Priority mockPriority1 = Priority.newInstance(1);
final Priority mockPriority2 = Priority.newInstance(2);
final Priority mockPriority3 = Priority.newInstance(3);
+ final Priority mockPriority4 = Priority.newInstance(3);
Object mockTask2 = mock(Object.class);
Object mockCookie2 = mock(Object.class);
Object mockTask3 = mock(Object.class);
@@ -587,6 +678,9 @@ public class TestTaskScheduler {
if (allocations == 2) {
return mockPriority3;
}
+ if (allocations == 3) {
+ return mockPriority4;
+ }
return null;
}
});
@@ -643,6 +737,100 @@ public class TestTaskScheduler {
verify(mockApp, times(3)).containerCompleted(any(), (ContainerStatus) any());
verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
+ // verify blacklisting
+ verify(mockRMClient, times(0)).addNodeToBlacklist((NodeId)any());
+ String badHost = "host6";
+ NodeId badNodeId = mock(NodeId.class);
+ when(badNodeId.getHost()).thenReturn(badHost);
+ scheduler.blacklistNode(badNodeId);
+ verify(mockRMClient, times(1)).addNodeToBlacklist(badNodeId);
+ Object mockTask4 = mock(Object.class);
+ Object mockCookie4 = mock(Object.class);
+ scheduler.allocateTask(mockTask4, mockCapability, null,
+ null, mockPriority4, null, mockCookie4);
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(4)).addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request4 = requestCaptor.getValue();
+ anyContainers.clear();
+ anyContainers.add(request4);
+ Container mockContainer5 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer5.getNodeId().getHost()).thenReturn(badHost);
+ when(mockContainer5.getNodeId()).thenReturn(badNodeId);
+ ContainerId mockCId5 = mock(ContainerId.class);
+ when(mockContainer5.getId()).thenReturn(mockCId5);
+ containers.clear();
+ containers.add(mockContainer5);
+ when(
+ mockRMClient.getMatchingRequestsForTopPriority(
+ eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return anyList;
+ }
+
+ }).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return emptyList;
+ }
+
+ });
+ drainNotifier.set(false);
+ scheduler.onContainersAllocated(containers);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
+ // no new allocation
+ verify(mockApp, times(3)).taskAllocated(any(), any(), (Container) any());
+ // verify blacklisted container released
+ verify(mockRMClient).releaseAssignedContainer(mockCId5);
+ verify(mockRMClient, times(4)).releaseAssignedContainer((ContainerId) any());
+ // verify request added back
+ verify(mockRMClient, times(5)).addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request5 = requestCaptor.getValue();
+ anyContainers.clear();
+ anyContainers.add(request5);
+ Container mockContainer6 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer6.getNodeId().getHost()).thenReturn("host7");
+ ContainerId mockCId6 = mock(ContainerId.class);
+ when(mockContainer6.getId()).thenReturn(mockCId6);
+ containers.clear();
+ containers.add(mockContainer6);
+ when(
+ mockRMClient.getMatchingRequestsForTopPriority(
+ eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return anyList;
+ }
+
+ }).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return emptyList;
+ }
+
+ });
+ drainNotifier.set(false);
+ scheduler.onContainersAllocated(containers);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
+ // new allocation
+ verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
+ verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6);
+ // deallocate allocated task
+ assertTrue(scheduler.deallocateTask(mockTask4, true));
+ drainableAppCallback.drain();
+ verify(mockApp).containerBeingReleased(mockCId6);
+ verify(mockRMClient).releaseAssignedContainer(mockCId6);
+ verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
float progress = 0.5f;
when(mockApp.getProgress()).thenReturn(progress);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
index 31304a9..b7ce891 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
@@ -21,30 +21,67 @@ package org.apache.tez.dag.app.rm.node;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklisted;
+import org.apache.tez.dag.app.rm.AMSchedulerEventType;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-public class TestAMNodeMap {
+import com.google.common.collect.Lists;
- @Test
- @SuppressWarnings({ "resource", "rawtypes" })
- public void testHealthUpdateKnownNode() {
+@SuppressWarnings({ "resource", "rawtypes" })
+public class TestAMNodeMap {
- DrainDispatcher dispatcher = new DrainDispatcher();
+ DrainDispatcher dispatcher;
+ EventHandler eventHandler;
+
+ @Before
+ public void setup() {
+ dispatcher = new DrainDispatcher();
dispatcher.init(new Configuration());
dispatcher.start();
- EventHandler eventHandler = dispatcher.getEventHandler();
-
+ eventHandler = dispatcher.getEventHandler();
+ }
+
+ class TestEventHandler implements EventHandler{
+ List<Event> events = Lists.newLinkedList();
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handle(Event event) {
+ events.add(event);
+ eventHandler.handle(event);
+ }
+ }
+
+ @After
+ public void teardown() {
+ dispatcher.stop();
+ }
+
+ @Test(timeout=5000)
+ public void testHealthUpdateKnownNode() {
AppContext appContext = mock(AppContext.class);
AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+ amNodeMap.init(new Configuration(false));
+ amNodeMap.start();
NodeId nodeId = NodeId.newInstance("host1", 2342);
amNodeMap.nodeSeen(nodeId);
@@ -53,27 +90,85 @@ public class TestAMNodeMap {
amNodeMap.handle(new AMNodeEventStateChanged(nodeReport));
dispatcher.await();
assertEquals(AMNodeState.UNHEALTHY, amNodeMap.get(nodeId).getState());
- dispatcher.stop();
+ amNodeMap.stop();
}
- @Test
- @SuppressWarnings({ "resource", "rawtypes" })
+ @Test(timeout=5000)
public void testHealthUpdateUnknownNode() {
- DrainDispatcher dispatcher = new DrainDispatcher();
- EventHandler eventHandler = dispatcher.getEventHandler();
-
AppContext appContext = mock(AppContext.class);
AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+ amNodeMap.init(new Configuration(false));
+ amNodeMap.start();
NodeId nodeId = NodeId.newInstance("unknownhost", 2342);
NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
amNodeMap.handle(new AMNodeEventStateChanged(nodeReport));
+ dispatcher.await();
+ amNodeMap.stop();
// No exceptions - the status update was ignored. Not bothering to capture
// the log message for verification.
- dispatcher.stop();
+ }
+
+ @Test(timeout=5000)
+ public void testNodeSelfBlacklist() {
+ AppContext appContext = mock(AppContext.class);
+ Configuration conf = new Configuration(false);
+ conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+ TestEventHandler handler = new TestEventHandler();
+ AMNodeMap amNodeMap = new AMNodeMap(handler, appContext);
+ amNodeMap.init(conf);
+ amNodeMap.start();
+
+ NodeId nodeId = NodeId.newInstance("host1", 1234);
+ amNodeMap.nodeSeen(nodeId);
+ AMNodeImpl node = (AMNodeImpl) amNodeMap.get(nodeId);
+
+ ContainerId cId1 = mock(ContainerId.class);
+ ContainerId cId2 = mock(ContainerId.class);
+ ContainerId cId3 = mock(ContainerId.class);
+
+ amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId, cId1));
+ amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId, cId2));
+ amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId, cId3));
+ assertEquals(3, node.containers.size());
+
+ TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
+ TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
+ TezTaskAttemptID ta3 = mock(TezTaskAttemptID.class);
+
+ amNodeMap.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, cId1, ta1));
+ assertEquals(1, node.numSuccessfulTAs);
+
+ amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
+ assertEquals(1, node.numSuccessfulTAs);
+ assertEquals(1, node.numFailedTAs);
+ assertEquals(AMNodeState.ACTIVE, node.getState());
+ // duplicate should not affect anything
+ amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
+ assertEquals(1, node.numSuccessfulTAs);
+ assertEquals(1, node.numFailedTAs);
+ assertEquals(AMNodeState.ACTIVE, node.getState());
+
+ amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta3, true));
+ assertEquals(1, node.numSuccessfulTAs);
+ assertEquals(2, node.numFailedTAs);
+ assertEquals(AMNodeState.BLACKLISTED, node.getState());
+
+ assertEquals(5, handler.events.size());
+ assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
+ assertEquals(cId1, ((AMContainerEventNodeFailed)handler.events.get(0)).getContainerId());
+ assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(1).getType());
+ assertEquals(cId2, ((AMContainerEventNodeFailed)handler.events.get(1)).getContainerId());
+ assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(2).getType());
+ assertEquals(cId3, ((AMContainerEventNodeFailed)handler.events.get(2)).getContainerId());
+ assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(3).getType());
+ assertEquals(node.getNodeId(), ((AMNodeEvent)handler.events.get(3)).getNodeId());
+ assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(4).getType());
+ assertEquals(node.getNodeId(), ((AMSchedulerEventNodeBlacklisted)handler.events.get(4)).getNodeId());
+ amNodeMap.stop();
}
private static NodeReport generateNodeReport(NodeId nodeId, NodeState nodeState) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
index f98f392..b39457a 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
@@ -80,7 +80,7 @@ public class TestMRRJobs {
}
if (mrrTezCluster == null) {
- mrrTezCluster = new MiniTezCluster(TestMRRJobs.class.getName(), 1,
+ mrrTezCluster = new MiniTezCluster(TestMRRJobs.class.getName(), 3,
1, 1);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 9569f6e..db259b1 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -91,6 +91,7 @@ public class TestFaultTolerance {
TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
AMConfiguration amConfig = new AMConfiguration(
new HashMap<String, String>(), new HashMap<String, LocalResource>(),