You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/19 23:42:53 UTC
git commit: TEZ-845. Handle un-blacklisting of nodes (bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master 816e2e5a9 -> 972dbc6fb
TEZ-845. Handle un-blacklisting of nodes (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/972dbc6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/972dbc6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/972dbc6f
Branch: refs/heads/master
Commit: 972dbc6fb2afb983b3e780ce19b3c76c24e67c7d
Parents: 816e2e5
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 19 14:42:10 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 19 14:42:23 2014 -0800
----------------------------------------------------------------------
.../rm/AMSchedulerEventNodeBlacklistUpdate.java | 36 +++++++++++
.../app/rm/AMSchedulerEventNodeBlacklisted.java | 35 -----------
.../tez/dag/app/rm/AMSchedulerEventType.java | 1 +
.../apache/tez/dag/app/rm/TaskScheduler.java | 7 +++
.../dag/app/rm/TaskSchedulerEventHandler.java | 25 +++++++-
.../tez/dag/app/rm/TezAMRMClientAsync.java | 5 ++
.../apache/tez/dag/app/rm/node/AMNodeImpl.java | 10 ++-
.../apache/tez/dag/app/rm/node/AMNodeMap.java | 4 +-
.../tez/dag/app/rm/TestTaskScheduler.java | 10 ++-
.../tez/dag/app/rm/node/TestAMNodeMap.java | 64 ++++++++++++++++++--
.../org/apache/tez/mapreduce/TestMRRJobs.java | 2 +-
11 files changed, 152 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
new file mode 100644
index 0000000..ed7ebc3
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class AMSchedulerEventNodeBlacklistUpdate extends AMSchedulerEvent {
+
+ private final NodeId nodeId;
+
+ public AMSchedulerEventNodeBlacklistUpdate(NodeId nodeId, boolean add) {
+ super((add ? AMSchedulerEventType.S_NODE_BLACKLISTED
+ : AMSchedulerEventType.S_NODE_UNBLACKLISTED));
+ this.nodeId = nodeId;
+ }
+
+ public NodeId getNodeId() {
+ return this.nodeId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklisted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklisted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklisted.java
deleted file mode 100644
index 9cf3d65..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklisted.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import org.apache.hadoop.yarn.api.records.NodeId;
-
-public class AMSchedulerEventNodeBlacklisted extends AMSchedulerEvent {
-
- private final NodeId nodeId;
-
- public AMSchedulerEventNodeBlacklisted(NodeId nodeId) {
- super(AMSchedulerEventType.S_NODE_BLACKLISTED);
- this.nodeId = nodeId;
- }
-
- public NodeId getNodeId() {
- return this.nodeId;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
index 0554cff..8a4c371 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
@@ -31,6 +31,7 @@ public enum AMSchedulerEventType {
//Producer: Node
S_NODE_BLACKLISTED,
+ S_NODE_UNBLACKLISTED,
S_NODE_UNHEALTHY,
S_NODE_HEALTHY,
// The scheduler should have a way of knowing about unusable nodes. Acting on
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 814af6b..6ac9481 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -262,6 +262,7 @@ public class TaskScheduler extends AbstractService
}
public int getClusterNodeCount() {
+ // this can potentially be cheaper after YARN-1722
return amRmClient.getClusterNodeCount();
}
@@ -790,6 +791,12 @@ public class TaskScheduler extends AbstractService
blacklistedNodes.add(nodeId);
}
+ public synchronized void unblacklistNode(NodeId nodeId) {
+ if (blacklistedNodes.remove(nodeId)) {
+ amRmClient.removeNodeFromBlacklist(nodeId);
+ }
+ }
+
public synchronized void allocateTask(
Object task,
Resource capability,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 550b3a2..83816b8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -58,6 +58,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
import org.apache.tez.dag.app.rm.container.AMContainerState;
import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
+import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
@@ -79,6 +80,7 @@ public class TaskSchedulerEventHandler extends AbstractService
protected volatile boolean isSignalled = false;
final DAGClientServer clientService;
private final ContainerSignatureMatcher containerSignatureMatcher;
+ private int cachedNodeCount = -1;
BlockingQueue<AMSchedulerEvent> eventQueue
= new LinkedBlockingQueue<AMSchedulerEvent>();
@@ -137,14 +139,18 @@ public class TaskSchedulerEventHandler extends AbstractService
break;
case S_CONTAINER_COMPLETED:
break;
+ case S_NODE_UNBLACKLISTED:
+ // fall through
case S_NODE_BLACKLISTED:
- handleNodeBlacklist((AMSchedulerEventNodeBlacklisted)sEvent);
+ handleNodeBlacklistUpdate((AMSchedulerEventNodeBlacklistUpdate)sEvent);
break;
case S_NODE_UNHEALTHY:
break;
case S_NODE_HEALTHY:
// Consider changing this to work like BLACKLISTING.
break;
+ default:
+ break;
}
}
@@ -171,8 +177,14 @@ public class TaskSchedulerEventHandler extends AbstractService
eventHandler.handle(event);
}
- private void handleNodeBlacklist(AMSchedulerEventNodeBlacklisted event) {
- taskScheduler.blacklistNode(event.getNodeId());
+ private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
+ if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
+ taskScheduler.blacklistNode(event.getNodeId());
+ } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
+ taskScheduler.unblacklistNode(event.getNodeId());
+ } else {
+ throw new TezUncheckedException("Invalid event type: " + event.getType());
+ }
}
private void handleContainerDeallocate(
@@ -507,6 +519,13 @@ public class TaskSchedulerEventHandler extends AbstractService
// complete and can hence lead to a deadlock if called from within a TSEH lock.
@Override
public float getProgress() {
+ // at this point allocate has been called and so node count must be available
+ // may change after YARN-1722
+ int nodeCount = taskScheduler.getClusterNodeCount();
+ if (nodeCount != cachedNodeCount) {
+ cachedNodeCount = nodeCount;
+ sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount));
+ }
return dagAppMaster.getProgress();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
index e90ed0f..dad9473 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
@@ -86,6 +86,11 @@ public class TezAMRMClientAsync<T extends ContainerRequest> extends AMRMClientAs
public synchronized void addNodeToBlacklist(NodeId nodeId) {
client.updateBlacklist(Collections.singletonList(nodeId.getHost()), null);
}
+
+ //Remove after YARN-1723 is fixed
+ public synchronized void removeNodeFromBlacklist(NodeId nodeId) {
+ client.updateBlacklist(null, Collections.singletonList(nodeId.getHost()));
+ }
@Override
public synchronized void addContainerRequest(T req) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 256eb1f..65c4b2b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklisted;
+import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklistUpdate;
import org.apache.tez.dag.app.rm.container.AMContainerEvent;
import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
@@ -248,9 +248,12 @@ public class AMNodeImpl implements AMNode {
for (ContainerId c : containers) {
sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
}
+ // these containers are not useful anymore
+ pastContainers.addAll(containers);
+ containers.clear();
sendEvent(new AMNodeEvent(getNodeId(),
AMNodeEventType.N_NODE_WAS_BLACKLISTED));
- sendEvent(new AMSchedulerEventNodeBlacklisted(getNodeId()));
+ sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true));
}
@SuppressWarnings("unchecked")
@@ -343,6 +346,9 @@ public class AMNodeImpl implements AMNode {
@Override
public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
node.ignoreBlacklisting = ignore;
+ if (node.getState() == AMNodeState.BLACKLISTED) {
+ node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
index 0336a9e..f815836 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
@@ -132,13 +132,15 @@ public class AMNodeMap extends AbstractService implements
NodeId nodeId = rEvent.getNodeId();
switch (rEvent.getType()) {
case N_NODE_WAS_BLACKLISTED:
- // When moving away from IGNORE_BLACKLISTING state, nodes will send out blacklisted events. These need to be ignored.
+ // When moving away from IGNORE_BLACKLISTING state, nodes will send out
+ // blacklisted events. These need to be ignored.
addToBlackList(nodeId);
computeIgnoreBlacklisting();
break;
case N_NODE_COUNT_UPDATED:
AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent;
numClusterNodes = event.getNodeCount();
+ LOG.info("Num cluster nodes = " + numClusterNodes);
computeIgnoreBlacklisting();
break;
case N_TURNED_UNHEALTHY:
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index a01412c..ff4036d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -437,7 +437,11 @@ public class TestTaskScheduler {
verify(mockApp).containerBeingReleased(mockCId6);
verify(mockRMClient).releaseAssignedContainer(mockCId6);
verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
-
+ // test unblacklist
+ scheduler.unblacklistNode(badNodeId);
+ verify(mockRMClient, times(1)).removeNodeFromBlacklist(badNodeId);
+ assertEquals(0, scheduler.blacklistedNodes.size());
+
float progress = 0.5f;
when(mockApp.getProgress()).thenReturn(progress);
Assert.assertEquals(progress, scheduler.getProgress(), 0);
@@ -831,6 +835,10 @@ public class TestTaskScheduler {
verify(mockApp).containerBeingReleased(mockCId6);
verify(mockRMClient).releaseAssignedContainer(mockCId6);
verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
+ // test unblacklist
+ scheduler.unblacklistNode(badNodeId);
+ verify(mockRMClient, times(1)).removeNodeFromBlacklist(badNodeId);
+ assertEquals(0, scheduler.blacklistedNodes.size());
float progress = 0.5f;
when(mockApp.getProgress()).thenReturn(progress);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
index b7ce891..a5b6e58 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
@@ -34,9 +34,8 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklisted;
+import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklistUpdate;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
-import org.apache.tez.dag.app.rm.container.AMContainerEvent;
import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -119,11 +118,19 @@ public class TestAMNodeMap {
conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
TestEventHandler handler = new TestEventHandler();
AMNodeMap amNodeMap = new AMNodeMap(handler, appContext);
+ dispatcher.register(AMNodeEventType.class, amNodeMap);
amNodeMap.init(conf);
amNodeMap.start();
+ amNodeMap.handle(new AMNodeEventNodeCountUpdated(4));
NodeId nodeId = NodeId.newInstance("host1", 1234);
+ NodeId nodeId2 = NodeId.newInstance("host2", 1234);
+ NodeId nodeId3 = NodeId.newInstance("host3", 1234);
+ NodeId nodeId4 = NodeId.newInstance("host4", 1234);
amNodeMap.nodeSeen(nodeId);
+ amNodeMap.nodeSeen(nodeId2);
+ amNodeMap.nodeSeen(nodeId3);
+ amNodeMap.nodeSeen(nodeId4);
AMNodeImpl node = (AMNodeImpl) amNodeMap.get(nodeId);
ContainerId cId1 = mock(ContainerId.class);
@@ -152,7 +159,8 @@ public class TestAMNodeMap {
assertEquals(1, node.numFailedTAs);
assertEquals(AMNodeState.ACTIVE, node.getState());
- amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta3, true));
+ amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId3, ta3, true));
+ dispatcher.await();
assertEquals(1, node.numSuccessfulTAs);
assertEquals(2, node.numFailedTAs);
assertEquals(AMNodeState.BLACKLISTED, node.getState());
@@ -167,7 +175,55 @@ public class TestAMNodeMap {
assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(3).getType());
assertEquals(node.getNodeId(), ((AMNodeEvent)handler.events.get(3)).getNodeId());
assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(4).getType());
- assertEquals(node.getNodeId(), ((AMSchedulerEventNodeBlacklisted)handler.events.get(4)).getNodeId());
+ assertEquals(node.getNodeId(), ((AMSchedulerEventNodeBlacklistUpdate)handler.events.get(4)).getNodeId());
+
+ ContainerId cId4 = mock(ContainerId.class);
+ ContainerId cId5 = mock(ContainerId.class);
+ TezTaskAttemptID ta4 = mock(TezTaskAttemptID.class);
+ TezTaskAttemptID ta5 = mock(TezTaskAttemptID.class);
+ AMNodeImpl node2 = (AMNodeImpl) amNodeMap.get(nodeId2);
+ amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId2, cId4));
+ amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId2, cId5));
+
+ amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId4, ta4, true));
+ assertEquals(1, node2.numFailedTAs);
+ assertEquals(AMNodeState.ACTIVE, node2.getState());
+
+ handler.events.clear();
+ amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId5, ta5, true));
+ dispatcher.await();
+ assertEquals(2, node2.numFailedTAs);
+ assertEquals(AMNodeState.FORCED_ACTIVE, node2.getState());
+ AMNodeImpl node3 = (AMNodeImpl)amNodeMap.get(nodeId3);
+ assertEquals(AMNodeState.FORCED_ACTIVE, node3.getState());
+ assertEquals(10, handler.events.size());
+ assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
+ assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(1).getType());
+ assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(2).getType());
+ assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(3).getType());
+ assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(4).getType());
+ assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(5).getType());
+ assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(6).getType());
+ assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(7).getType());
+ assertEquals(AMSchedulerEventType.S_NODE_UNBLACKLISTED, handler.events.get(8).getType());
+ assertEquals(AMSchedulerEventType.S_NODE_UNBLACKLISTED, handler.events.get(9).getType());
+
+ handler.events.clear();
+ amNodeMap.handle(new AMNodeEventNodeCountUpdated(8));
+ dispatcher.await();
+ assertEquals(AMNodeState.BLACKLISTED, node.getState());
+ assertEquals(AMNodeState.BLACKLISTED, node2.getState());
+ assertEquals(AMNodeState.ACTIVE, node3.getState());
+ assertEquals(8, handler.events.size());
+ assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, handler.events.get(0).getType());
+ assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, handler.events.get(1).getType());
+ assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, handler.events.get(2).getType());
+ assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, handler.events.get(3).getType());
+ assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(4).getType());
+ assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(5).getType());
+ assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(6).getType());
+ assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(7).getType());
+
amNodeMap.stop();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
index b39457a..f98f392 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
@@ -80,7 +80,7 @@ public class TestMRRJobs {
}
if (mrrTezCluster == null) {
- mrrTezCluster = new MiniTezCluster(TestMRRJobs.class.getName(), 3,
+ mrrTezCluster = new MiniTezCluster(TestMRRJobs.class.getName(), 1,
1, 1);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS