You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/12 19:24:57 UTC
tez git commit: TEZ-2713. Add tests for node handling when there's
multiple schedulers. (sseth)
Repository: tez
Updated Branches:
refs/heads/TEZ-2003 2fe54840c -> c9965b906
TEZ-2713. Add tests for node handling when there's multiple schedulers. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c9965b90
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c9965b90
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c9965b90
Branch: refs/heads/TEZ-2003
Commit: c9965b9063c12775f2141ed2f35917d3fa98f2ce
Parents: 2fe5484
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Aug 12 10:24:33 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Aug 12 10:24:33 2015 -0700
----------------------------------------------------------------------
.../tez/dag/app/rm/node/AMNodeTracker.java | 8 +
.../tez/dag/app/rm/node/TestAMNodeTracker.java | 275 +++++++++++++++----
2 files changed, 231 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/c9965b90/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 751276e..1aa8472 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -116,6 +116,14 @@ public class AMNodeTracker extends AbstractService implements
return perSourceNodeTrackers.get(schedulerId).get(nodeId);
}
+ /**
+ * Retrieve the number of nodes from this source on which containers may be running
+ *
+ * This number may differ from the total number of nodes available from the source
+ *
+ * @param schedulerId the schedulerId for which the node count is required
+ * @return the number of nodes from the scheduler on which containers have been allocated
+ */
public int getNumNodes(int schedulerId) {
return perSourceNodeTrackers.get(schedulerId).getNumNodes();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/c9965b90/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
index 84d2e1f..def80da 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -19,6 +19,9 @@
package org.apache.tez.dag.app.rm.node;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
@@ -123,6 +126,55 @@ public class TestAMNodeTracker {
}
@Test (timeout = 5000)
+ public void testMultipleSourcesNodeRegistration() {
+ AppContext appContext = mock(AppContext.class);
+ AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+ doReturn(amNodeTracker).when(appContext).getNodeTracker();
+
+ amNodeTracker.init(new Configuration(false));
+ amNodeTracker.start();
+
+ NodeId nodeId1 = NodeId.newInstance("source01", 3333);
+ NodeId nodeId2 = NodeId.newInstance("source02", 3333);
+
+ amNodeTracker.nodeSeen(nodeId1, 0);
+ amNodeTracker.nodeSeen(nodeId2, 1);
+
+ assertEquals(1, amNodeTracker.getNumNodes(0));
+ assertEquals(1, amNodeTracker.getNumNodes(1));
+ assertNotNull(amNodeTracker.get(nodeId1, 0));
+ assertNull(amNodeTracker.get(nodeId2, 0));
+ assertNull(amNodeTracker.get(nodeId1, 1));
+ assertNotNull(amNodeTracker.get(nodeId2, 1));
+ }
+
+ @Test (timeout = 5000)
+ public void testMultipleSourcesNodeCountUpdated() {
+ AppContext appContext = mock(AppContext.class);
+ AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+ doReturn(amNodeTracker).when(appContext).getNodeTracker();
+
+ amNodeTracker.init(new Configuration(false));
+ amNodeTracker.start();
+
+ NodeId nodeId1 = NodeId.newInstance("source01", 3333);
+ NodeId nodeId2 = NodeId.newInstance("source02", 3333);
+
+ amNodeTracker.nodeSeen(nodeId1, 0);
+ amNodeTracker.nodeSeen(nodeId2, 1);
+ amNodeTracker.handle(new AMNodeEventNodeCountUpdated(10, 0));
+ amNodeTracker.handle(new AMNodeEventNodeCountUpdated(20, 1));
+
+ // NodeCountUpdate does not reflect in getNumNodes.
+ assertEquals(1, amNodeTracker.getNumNodes(0));
+ assertEquals(1, amNodeTracker.getNumNodes(1));
+ assertNotNull(amNodeTracker.get(nodeId1, 0));
+ assertNull(amNodeTracker.get(nodeId2, 0));
+ assertNull(amNodeTracker.get(nodeId1, 1));
+ assertNotNull(amNodeTracker.get(nodeId2, 1));
+ }
+
+ @Test (timeout = 5000)
public void testSingleNodeNotBlacklisted() {
AppContext appContext = mock(AppContext.class);
Configuration conf = new Configuration(false);
@@ -142,32 +194,61 @@ public class TestAMNodeTracker {
amNodeTracker.init(conf);
amNodeTracker.start();
- amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, 0));
- NodeId nodeId = NodeId.newInstance("host1", 1234);
- amNodeTracker.nodeSeen(nodeId, 0);
+ _testSingleNodeNotBlacklisted(amNodeTracker, handler, 0);
+ }
- AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0);
+ @Test (timeout = 5000)
+ public void testSingleNodeNotBlacklistedAlternateScheduler() {
+ AppContext appContext = mock(AppContext.class);
+ Configuration conf = new Configuration(false);
+ conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+ conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, true);
+ conf.setInt(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD, 33);
- ContainerId cId1 = mock(ContainerId.class);
- ContainerId cId2 = mock(ContainerId.class);
+ TestEventHandler handler = new TestEventHandler();
+ AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+ doReturn(amNodeTracker).when(appContext).getNodeTracker();
+ AMContainerMap amContainerMap = mock(AMContainerMap.class);
+ TaskSchedulerEventHandler taskSchedulerEventHandler =
+ mock(TaskSchedulerEventHandler.class);
+ dispatcher.register(AMNodeEventType.class, amNodeTracker);
+ dispatcher.register(AMContainerEventType.class, amContainerMap);
+ dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
+ amNodeTracker.init(conf);
+ amNodeTracker.start();
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId1));
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId2));
+ _testSingleNodeNotBlacklisted(amNodeTracker, handler, 1);
+ }
- TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
- TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
+ @Test (timeout = 5000)
+ public void testSingleNodeNotBlacklistedAlternateScheduler2() {
+ AppContext appContext = mock(AppContext.class);
+ Configuration conf = new Configuration(false);
+ conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+ conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, true);
+ conf.setInt(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD, 33);
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId1, ta1, true));
- dispatcher.await();
- assertEquals(1, node.numFailedTAs);
- assertEquals(AMNodeState.ACTIVE, node.getState());
+ TestEventHandler handler = new TestEventHandler();
+ AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+ doReturn(amNodeTracker).when(appContext).getNodeTracker();
+ AMContainerMap amContainerMap = mock(AMContainerMap.class);
+ TaskSchedulerEventHandler taskSchedulerEventHandler =
+ mock(TaskSchedulerEventHandler.class);
+ dispatcher.register(AMNodeEventType.class, amNodeTracker);
+ dispatcher.register(AMContainerEventType.class, amContainerMap);
+ dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
+ amNodeTracker.init(conf);
+ amNodeTracker.start();
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true));
- dispatcher.await();
- assertEquals(2, node.numFailedTAs);
- assertEquals(1, handler.events.size());
- assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(0).getType());
- assertEquals(AMNodeState.FORCED_ACTIVE, node.getState());
+ // Register multiple nodes from a scheduler which isn't being tested.
+ // This should not affect the blacklisting behaviour
+ for (int i = 0 ; i < 10 ; i++) {
+ amNodeTracker.nodeSeen(NodeId.newInstance("fakenode" + i, 3333), 0);
+ }
+
+ _testSingleNodeNotBlacklisted(amNodeTracker, handler, 1);
+ // No impact on blacklisting for the alternate source
+ assertFalse(amNodeTracker.isBlacklistingIgnored(0));
}
@Test(timeout=10000)
@@ -186,50 +267,142 @@ public class TestAMNodeTracker {
dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
amNodeTracker.init(conf);
amNodeTracker.start();
+ try {
+ _testNodeSelfBlacklist(amNodeTracker, handler, 0);
+ } finally {
+ amNodeTracker.stop();
+ }
+ }
- amNodeTracker.handle(new AMNodeEventNodeCountUpdated(4, 0));
+ @Test(timeout=10000)
+ public void testNodeSelfBlacklistAlternateScheduler1() {
+ AppContext appContext = mock(AppContext.class);
+ Configuration conf = new Configuration(false);
+ conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+ TestEventHandler handler = new TestEventHandler();
+ AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+ doReturn(amNodeTracker).when(appContext).getNodeTracker();
+ AMContainerMap amContainerMap = mock(AMContainerMap.class);
+ TaskSchedulerEventHandler taskSchedulerEventHandler =
+ mock(TaskSchedulerEventHandler.class);
+ dispatcher.register(AMNodeEventType.class, amNodeTracker);
+ dispatcher.register(AMContainerEventType.class, amContainerMap);
+ dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
+ amNodeTracker.init(conf);
+ amNodeTracker.start();
+ try {
+ _testNodeSelfBlacklist(amNodeTracker, handler, 1);
+ } finally {
+ amNodeTracker.stop();
+ }
+ }
+
+ @Test(timeout=10000)
+ public void testNodeSelfBlacklistAlternateScheduler2() {
+ AppContext appContext = mock(AppContext.class);
+ Configuration conf = new Configuration(false);
+ conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+ TestEventHandler handler = new TestEventHandler();
+ AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+ doReturn(amNodeTracker).when(appContext).getNodeTracker();
+ AMContainerMap amContainerMap = mock(AMContainerMap.class);
+ TaskSchedulerEventHandler taskSchedulerEventHandler =
+ mock(TaskSchedulerEventHandler.class);
+ dispatcher.register(AMNodeEventType.class, amNodeTracker);
+ dispatcher.register(AMContainerEventType.class, amContainerMap);
+ dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
+ amNodeTracker.init(conf);
+ amNodeTracker.start();
+ try {
+ // Register multiple nodes from a scheduler which isn't being tested.
+ // This should not affect the blacklisting behaviour
+ for (int i = 0 ; i < 100 ; i++) {
+ amNodeTracker.nodeSeen(NodeId.newInstance("fakenode" + i, 3333), 0);
+ }
+ _testNodeSelfBlacklist(amNodeTracker, handler, 1);
+ assertFalse(amNodeTracker.isBlacklistingIgnored(0));
+ } finally {
+ amNodeTracker.stop();
+ }
+ }
+
+ private void _testSingleNodeNotBlacklisted(AMNodeTracker amNodeTracker,
+ TestEventHandler handler, int schedulerId) {
+ amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, schedulerId));
+ NodeId nodeId = NodeId.newInstance("host1", 1234);
+ amNodeTracker.nodeSeen(nodeId, schedulerId);
+
+ AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, schedulerId);
+
+ ContainerId cId1 = mock(ContainerId.class);
+ ContainerId cId2 = mock(ContainerId.class);
+
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId1));
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId2));
+
+ TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
+ TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
+
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId1, ta1, true));
+ dispatcher.await();
+ assertEquals(1, node.numFailedTAs);
+ assertEquals(AMNodeState.ACTIVE, node.getState());
+
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId2, ta2, true));
+ dispatcher.await();
+ assertEquals(2, node.numFailedTAs);
+ assertEquals(1, handler.events.size());
+ assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(0).getType());
+ assertEquals(AMNodeState.FORCED_ACTIVE, node.getState());
+ // Blacklisting should be ignored since the node should have been blacklisted, but has not been
+ // as a result of being a single node for the source
+ assertTrue(amNodeTracker.isBlacklistingIgnored(schedulerId));
+ }
+
+ private void _testNodeSelfBlacklist(AMNodeTracker amNodeTracker, TestEventHandler handler, int schedulerId) {
+ amNodeTracker.handle(new AMNodeEventNodeCountUpdated(4, schedulerId));
NodeId nodeId = NodeId.newInstance("host1", 1234);
NodeId nodeId2 = NodeId.newInstance("host2", 1234);
NodeId nodeId3 = NodeId.newInstance("host3", 1234);
NodeId nodeId4 = NodeId.newInstance("host4", 1234);
- amNodeTracker.nodeSeen(nodeId, 0);
- amNodeTracker.nodeSeen(nodeId2, 0);
- amNodeTracker.nodeSeen(nodeId3, 0);
- amNodeTracker.nodeSeen(nodeId4, 0);
- AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0);
-
+ amNodeTracker.nodeSeen(nodeId, schedulerId);
+ amNodeTracker.nodeSeen(nodeId2, schedulerId);
+ amNodeTracker.nodeSeen(nodeId3, schedulerId);
+ amNodeTracker.nodeSeen(nodeId4, schedulerId);
+ AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, schedulerId);
+
ContainerId cId1 = mock(ContainerId.class);
ContainerId cId2 = mock(ContainerId.class);
ContainerId cId3 = mock(ContainerId.class);
-
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId1));
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId2));
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId3));
+
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId1));
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId2));
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId3));
assertEquals(3, node.containers.size());
-
+
TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
TezTaskAttemptID ta3 = mock(TezTaskAttemptID.class);
-
- amNodeTracker.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, 0, cId1, ta1));
+
+ amNodeTracker.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, schedulerId, cId1, ta1));
assertEquals(1, node.numSuccessfulTAs);
-
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true));
+
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId2, ta2, true));
assertEquals(1, node.numSuccessfulTAs);
assertEquals(1, node.numFailedTAs);
assertEquals(AMNodeState.ACTIVE, node.getState());
// duplicate should not affect anything
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true));
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId2, ta2, true));
assertEquals(1, node.numSuccessfulTAs);
assertEquals(1, node.numFailedTAs);
assertEquals(AMNodeState.ACTIVE, node.getState());
-
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId3, ta3, true));
+
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId3, ta3, true));
dispatcher.await();
assertEquals(1, node.numSuccessfulTAs);
assertEquals(2, node.numFailedTAs);
assertEquals(AMNodeState.BLACKLISTED, node.getState());
-
+
assertEquals(4, handler.events.size());
assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
assertEquals(cId1, ((AMContainerEventNodeFailed)handler.events.get(0)).getContainerId());
@@ -246,20 +419,20 @@ public class TestAMNodeTracker {
ContainerId cId5 = mock(ContainerId.class);
TezTaskAttemptID ta4 = mock(TezTaskAttemptID.class);
TezTaskAttemptID ta5 = mock(TezTaskAttemptID.class);
- AMNodeImpl node2 = (AMNodeImpl) amNodeTracker.get(nodeId2, 0);
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, 0, cId4));
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, 0, cId5));
-
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, 0, cId4, ta4, true));
+ AMNodeImpl node2 = (AMNodeImpl) amNodeTracker.get(nodeId2, schedulerId);
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, schedulerId, cId4));
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, schedulerId, cId5));
+
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, schedulerId, cId4, ta4, true));
assertEquals(1, node2.numFailedTAs);
assertEquals(AMNodeState.ACTIVE, node2.getState());
-
+
handler.events.clear();
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, 0, cId5, ta5, true));
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, schedulerId, cId5, ta5, true));
dispatcher.await();
assertEquals(2, node2.numFailedTAs);
assertEquals(AMNodeState.FORCED_ACTIVE, node2.getState());
- AMNodeImpl node3 = (AMNodeImpl) amNodeTracker.get(nodeId3, 0);
+ AMNodeImpl node3 = (AMNodeImpl) amNodeTracker.get(nodeId3, schedulerId);
assertEquals(AMNodeState.FORCED_ACTIVE, node3.getState());
assertEquals(5, handler.events.size());
@@ -286,7 +459,7 @@ public class TestAMNodeTracker {
// Increase the number of nodes. BLACKLISTING should be re-enabled.
// Node 1 and Node 2 should go into BLACKLISTED state
handler.events.clear();
- amNodeTracker.handle(new AMNodeEventNodeCountUpdated(8, 0));
+ amNodeTracker.handle(new AMNodeEventNodeCountUpdated(8, schedulerId));
dispatcher.await();
LOG.info(("Completed waiting for dispatcher to process all pending events"));
assertEquals(AMNodeState.BLACKLISTED, node.getState());
@@ -317,7 +490,7 @@ public class TestAMNodeTracker {
assertEquals(4, numIgnoreBlacklistingDisabledEvents);
assertEquals(2, numBlacklistedEvents);
assertEquals(2, numNodeFailedEvents);
-
+
amNodeTracker.stop();
}
@@ -336,6 +509,4 @@ public class TestAMNodeTracker {
doReturn(healthReportTime).when(nodeReport).getLastHealthReportTime();
return nodeReport;
}
-
- // TODO TEZ-2003. Add tests for multiple sources.
}