You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 09:26:37 UTC

[38/50] [abbrv] tez git commit: TEZ-2713. Add tests for node handling when there's multiple schedulers. (sseth)

TEZ-2713. Add tests for node handling when there's multiple schedulers. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ccdf936
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ccdf936
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ccdf936

Branch: refs/heads/master
Commit: 4ccdf9362029b2fa21bf91719fd93d40e2b3465f
Parents: c03a6ff
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Aug 12 10:24:33 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:15:23 2015 -0700

----------------------------------------------------------------------
 .../tez/dag/app/rm/node/AMNodeTracker.java      |   8 +
 .../tez/dag/app/rm/node/TestAMNodeTracker.java  | 275 +++++++++++++++----
 2 files changed, 231 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4ccdf936/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 751276e..1aa8472 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -116,6 +116,14 @@ public class AMNodeTracker extends AbstractService implements
     return perSourceNodeTrackers.get(schedulerId).get(nodeId);
   }
 
+  /**
+   * Retrieve the number of nodes from this source on which containers may be running
+   *
+   * This number may differ from the total number of nodes available from the source
+   *
+   * @param schedulerId the schedulerId for which the node count is required
+   * @return the number of nodes from the scheduler on which containers have been allocated
+   */
   public int getNumNodes(int schedulerId) {
     return perSourceNodeTrackers.get(schedulerId).getNumNodes();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4ccdf936/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
index 84d2e1f..def80da 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -19,6 +19,9 @@
 package org.apache.tez.dag.app.rm.node;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
@@ -123,6 +126,55 @@ public class TestAMNodeTracker {
   }
 
   @Test (timeout = 5000)
+  public void testMultipleSourcesNodeRegistration() {
+    AppContext appContext = mock(AppContext.class);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+
+    amNodeTracker.init(new Configuration(false));
+    amNodeTracker.start();
+
+    NodeId nodeId1 = NodeId.newInstance("source01", 3333);
+    NodeId nodeId2 = NodeId.newInstance("source02", 3333);
+
+    amNodeTracker.nodeSeen(nodeId1, 0);
+    amNodeTracker.nodeSeen(nodeId2, 1);
+
+    assertEquals(1, amNodeTracker.getNumNodes(0));
+    assertEquals(1, amNodeTracker.getNumNodes(1));
+    assertNotNull(amNodeTracker.get(nodeId1, 0));
+    assertNull(amNodeTracker.get(nodeId2, 0));
+    assertNull(amNodeTracker.get(nodeId1, 1));
+    assertNotNull(amNodeTracker.get(nodeId2, 1));
+  }
+
+  @Test (timeout = 5000)
+  public void testMultipleSourcesNodeCountUpdated() {
+    AppContext appContext = mock(AppContext.class);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+
+    amNodeTracker.init(new Configuration(false));
+    amNodeTracker.start();
+
+    NodeId nodeId1 = NodeId.newInstance("source01", 3333);
+    NodeId nodeId2 = NodeId.newInstance("source02", 3333);
+
+    amNodeTracker.nodeSeen(nodeId1, 0);
+    amNodeTracker.nodeSeen(nodeId2, 1);
+    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(10, 0));
+    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(20, 1));
+
+    // NodeCountUpdate does not reflect in getNumNodes.
+    assertEquals(1, amNodeTracker.getNumNodes(0));
+    assertEquals(1, amNodeTracker.getNumNodes(1));
+    assertNotNull(amNodeTracker.get(nodeId1, 0));
+    assertNull(amNodeTracker.get(nodeId2, 0));
+    assertNull(amNodeTracker.get(nodeId1, 1));
+    assertNotNull(amNodeTracker.get(nodeId2, 1));
+  }
+
+  @Test (timeout = 5000)
   public void testSingleNodeNotBlacklisted() {
     AppContext appContext = mock(AppContext.class);
     Configuration conf = new Configuration(false);
@@ -142,32 +194,61 @@ public class TestAMNodeTracker {
     amNodeTracker.init(conf);
     amNodeTracker.start();
 
-    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, 0));
-    NodeId nodeId = NodeId.newInstance("host1", 1234);
-    amNodeTracker.nodeSeen(nodeId, 0);
+    _testSingleNodeNotBlacklisted(amNodeTracker, handler, 0);
+  }
 
-    AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0);
+  @Test (timeout = 5000)
+  public void testSingleNodeNotBlacklistedAlternateScheduler() {
+    AppContext appContext = mock(AppContext.class);
+    Configuration conf = new Configuration(false);
+    conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+    conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD, 33);
 
-    ContainerId cId1 = mock(ContainerId.class);
-    ContainerId cId2 = mock(ContainerId.class);
+    TestEventHandler handler = new TestEventHandler();
+    AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    AMContainerMap amContainerMap = mock(AMContainerMap.class);
+    TaskSchedulerEventHandler taskSchedulerEventHandler =
+        mock(TaskSchedulerEventHandler.class);
+    dispatcher.register(AMNodeEventType.class, amNodeTracker);
+    dispatcher.register(AMContainerEventType.class, amContainerMap);
+    dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
+    amNodeTracker.init(conf);
+    amNodeTracker.start();
 
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId1));
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId2));
+    _testSingleNodeNotBlacklisted(amNodeTracker, handler, 1);
+  }
 
-    TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
-    TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
+  @Test (timeout = 5000)
+  public void testSingleNodeNotBlacklistedAlternateScheduler2() {
+    AppContext appContext = mock(AppContext.class);
+    Configuration conf = new Configuration(false);
+    conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+    conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD, 33);
 
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId1, ta1, true));
-    dispatcher.await();
-    assertEquals(1, node.numFailedTAs);
-    assertEquals(AMNodeState.ACTIVE, node.getState());
+    TestEventHandler handler = new TestEventHandler();
+    AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    AMContainerMap amContainerMap = mock(AMContainerMap.class);
+    TaskSchedulerEventHandler taskSchedulerEventHandler =
+        mock(TaskSchedulerEventHandler.class);
+    dispatcher.register(AMNodeEventType.class, amNodeTracker);
+    dispatcher.register(AMContainerEventType.class, amContainerMap);
+    dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
+    amNodeTracker.init(conf);
+    amNodeTracker.start();
 
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true));
-    dispatcher.await();
-    assertEquals(2, node.numFailedTAs);
-    assertEquals(1, handler.events.size());
-    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(0).getType());
-    assertEquals(AMNodeState.FORCED_ACTIVE, node.getState());
+    // Register multiple nodes from a scheduler which isn't being tested.
+    // This should not affect the blacklisting behaviour
+    for (int i = 0 ; i < 10 ; i++) {
+      amNodeTracker.nodeSeen(NodeId.newInstance("fakenode" + i, 3333), 0);
+    }
+
+    _testSingleNodeNotBlacklisted(amNodeTracker, handler, 1);
+    // No impact on blacklisting for the alternate source
+    assertFalse(amNodeTracker.isBlacklistingIgnored(0));
   }
 
   @Test(timeout=10000)
@@ -186,50 +267,142 @@ public class TestAMNodeTracker {
     dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
     amNodeTracker.init(conf);
     amNodeTracker.start();
+    try {
+      _testNodeSelfBlacklist(amNodeTracker, handler, 0);
+    } finally {
+      amNodeTracker.stop();
+    }
+  }
 
-    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(4, 0));
+  @Test(timeout=10000)
+  public void testNodeSelfBlacklistAlternateScheduler1() {
+    AppContext appContext = mock(AppContext.class);
+    Configuration conf = new Configuration(false);
+    conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+    TestEventHandler handler = new TestEventHandler();
+    AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    AMContainerMap amContainerMap = mock(AMContainerMap.class);
+    TaskSchedulerEventHandler taskSchedulerEventHandler =
+        mock(TaskSchedulerEventHandler.class);
+    dispatcher.register(AMNodeEventType.class, amNodeTracker);
+    dispatcher.register(AMContainerEventType.class, amContainerMap);
+    dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
+    amNodeTracker.init(conf);
+    amNodeTracker.start();
+    try {
+      _testNodeSelfBlacklist(amNodeTracker, handler, 1);
+    } finally {
+      amNodeTracker.stop();
+    }
+  }
+
+  @Test(timeout=10000)
+  public void testNodeSelfBlacklistAlternateScheduler2() {
+    AppContext appContext = mock(AppContext.class);
+    Configuration conf = new Configuration(false);
+    conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+    TestEventHandler handler = new TestEventHandler();
+    AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    AMContainerMap amContainerMap = mock(AMContainerMap.class);
+    TaskSchedulerEventHandler taskSchedulerEventHandler =
+        mock(TaskSchedulerEventHandler.class);
+    dispatcher.register(AMNodeEventType.class, amNodeTracker);
+    dispatcher.register(AMContainerEventType.class, amContainerMap);
+    dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
+    amNodeTracker.init(conf);
+    amNodeTracker.start();
+    try {
+      // Register multiple nodes from a scheduler which isn't being tested.
+      // This should not affect the blacklisting behaviour
+      for (int i = 0 ; i < 100 ; i++) {
+        amNodeTracker.nodeSeen(NodeId.newInstance("fakenode" + i, 3333), 0);
+      }
+      _testNodeSelfBlacklist(amNodeTracker, handler, 1);
+      assertFalse(amNodeTracker.isBlacklistingIgnored(0));
+    } finally {
+      amNodeTracker.stop();
+    }
+  }
+
+  private void _testSingleNodeNotBlacklisted(AMNodeTracker amNodeTracker,
+                                             TestEventHandler handler, int schedulerId) {
+    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, schedulerId));
+    NodeId nodeId = NodeId.newInstance("host1", 1234);
+    amNodeTracker.nodeSeen(nodeId, schedulerId);
+
+    AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, schedulerId);
+
+    ContainerId cId1 = mock(ContainerId.class);
+    ContainerId cId2 = mock(ContainerId.class);
+
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId1));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId2));
+
+    TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
+    TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
+
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId1, ta1, true));
+    dispatcher.await();
+    assertEquals(1, node.numFailedTAs);
+    assertEquals(AMNodeState.ACTIVE, node.getState());
+
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId2, ta2, true));
+    dispatcher.await();
+    assertEquals(2, node.numFailedTAs);
+    assertEquals(1, handler.events.size());
+    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(0).getType());
+    assertEquals(AMNodeState.FORCED_ACTIVE, node.getState());
+    // Blacklisting should be ignored since the node should have been blacklisted, but has not been
+    // as a result of being a single node for the source
+    assertTrue(amNodeTracker.isBlacklistingIgnored(schedulerId));
+  }
+
+  private void _testNodeSelfBlacklist(AMNodeTracker amNodeTracker, TestEventHandler handler, int schedulerId) {
+    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(4, schedulerId));
     NodeId nodeId = NodeId.newInstance("host1", 1234);
     NodeId nodeId2 = NodeId.newInstance("host2", 1234);
     NodeId nodeId3 = NodeId.newInstance("host3", 1234);
     NodeId nodeId4 = NodeId.newInstance("host4", 1234);
-    amNodeTracker.nodeSeen(nodeId, 0);
-    amNodeTracker.nodeSeen(nodeId2, 0);
-    amNodeTracker.nodeSeen(nodeId3, 0);
-    amNodeTracker.nodeSeen(nodeId4, 0);
-    AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0);
-    
+    amNodeTracker.nodeSeen(nodeId, schedulerId);
+    amNodeTracker.nodeSeen(nodeId2, schedulerId);
+    amNodeTracker.nodeSeen(nodeId3, schedulerId);
+    amNodeTracker.nodeSeen(nodeId4, schedulerId);
+    AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, schedulerId);
+
     ContainerId cId1 = mock(ContainerId.class);
     ContainerId cId2 = mock(ContainerId.class);
     ContainerId cId3 = mock(ContainerId.class);
-    
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId1));
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId2));
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId3));
+
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId1));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId2));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId3));
     assertEquals(3, node.containers.size());
-    
+
     TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
     TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
     TezTaskAttemptID ta3 = mock(TezTaskAttemptID.class);
-    
-    amNodeTracker.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, 0, cId1, ta1));
+
+    amNodeTracker.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, schedulerId, cId1, ta1));
     assertEquals(1, node.numSuccessfulTAs);
-    
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true));
+
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId2, ta2, true));
     assertEquals(1, node.numSuccessfulTAs);
     assertEquals(1, node.numFailedTAs);
     assertEquals(AMNodeState.ACTIVE, node.getState());
     // duplicate should not affect anything
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true));
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId2, ta2, true));
     assertEquals(1, node.numSuccessfulTAs);
     assertEquals(1, node.numFailedTAs);
     assertEquals(AMNodeState.ACTIVE, node.getState());
-    
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId3, ta3, true));
+
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId3, ta3, true));
     dispatcher.await();
     assertEquals(1, node.numSuccessfulTAs);
     assertEquals(2, node.numFailedTAs);
     assertEquals(AMNodeState.BLACKLISTED, node.getState());
-    
+
     assertEquals(4, handler.events.size());
     assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
     assertEquals(cId1, ((AMContainerEventNodeFailed)handler.events.get(0)).getContainerId());
@@ -246,20 +419,20 @@ public class TestAMNodeTracker {
     ContainerId cId5 = mock(ContainerId.class);
     TezTaskAttemptID ta4 = mock(TezTaskAttemptID.class);
     TezTaskAttemptID ta5 = mock(TezTaskAttemptID.class);
-    AMNodeImpl node2 = (AMNodeImpl) amNodeTracker.get(nodeId2, 0);
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, 0, cId4));
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, 0, cId5));
-    
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, 0, cId4, ta4, true));
+    AMNodeImpl node2 = (AMNodeImpl) amNodeTracker.get(nodeId2, schedulerId);
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, schedulerId, cId4));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, schedulerId, cId5));
+
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, schedulerId, cId4, ta4, true));
     assertEquals(1, node2.numFailedTAs);
     assertEquals(AMNodeState.ACTIVE, node2.getState());
-    
+
     handler.events.clear();
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, 0, cId5, ta5, true));
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, schedulerId, cId5, ta5, true));
     dispatcher.await();
     assertEquals(2, node2.numFailedTAs);
     assertEquals(AMNodeState.FORCED_ACTIVE, node2.getState());
-    AMNodeImpl node3 = (AMNodeImpl) amNodeTracker.get(nodeId3, 0);
+    AMNodeImpl node3 = (AMNodeImpl) amNodeTracker.get(nodeId3, schedulerId);
     assertEquals(AMNodeState.FORCED_ACTIVE, node3.getState());
     assertEquals(5, handler.events.size());
 
@@ -286,7 +459,7 @@ public class TestAMNodeTracker {
     // Increase the number of nodes. BLACKLISTING should be re-enabled.
     // Node 1 and Node 2 should go into BLACKLISTED state
     handler.events.clear();
-    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(8, 0));
+    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(8, schedulerId));
     dispatcher.await();
     LOG.info(("Completed waiting for dispatcher to process all pending events"));
     assertEquals(AMNodeState.BLACKLISTED, node.getState());
@@ -317,7 +490,7 @@ public class TestAMNodeTracker {
     assertEquals(4, numIgnoreBlacklistingDisabledEvents);
     assertEquals(2, numBlacklistedEvents);
     assertEquals(2, numNodeFailedEvents);
-    
+
     amNodeTracker.stop();
   }
 
@@ -336,6 +509,4 @@ public class TestAMNodeTracker {
     doReturn(healthReportTime).when(nodeReport).getLastHealthReportTime();
     return nodeReport;
   }
-
-  // TODO TEZ-2003. Add tests for multiple sources.
 }