You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/26 04:17:49 UTC

git commit: TEZ-891. TestTaskScheduler does not handle mockApp being called on different thread (bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master a5713a41f -> f2f31e0ef


TEZ-891. TestTaskScheduler does not handle mockApp being called on different thread (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f2f31e0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f2f31e0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f2f31e0e

Branch: refs/heads/master
Commit: f2f31e0eff3f654d51ca7001a3bf1784cf8d5e0b
Parents: a5713a4
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Feb 25 19:17:36 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Feb 25 19:17:36 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/app/rm/TaskScheduler.java    |  3 +-
 .../tez/dag/app/rm/TestTaskScheduler.java       | 33 +++++++++++---------
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |  5 ++-
 3 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f2f31e0e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 775d342..c50e3a4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -68,7 +68,6 @@ import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /* TODO not yet updating cluster nodes on every allocate response
@@ -787,12 +786,14 @@ public class TaskScheduler extends AbstractService
   }
 
   public synchronized void blacklistNode(NodeId nodeId) {
+    LOG.info("Blacklisting node: " + nodeId);
     amRmClient.addNodeToBlacklist(nodeId);
     blacklistedNodes.add(nodeId);
   }
   
   public synchronized void unblacklistNode(NodeId nodeId) {
     if (blacklistedNodes.remove(nodeId)) {
+      LOG.info("UnBlacklisting node: " + nodeId);
       amRmClient.removeNodeFromBlacklist(nodeId);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f2f31e0e/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 150ef7a..63cc476 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -491,7 +491,7 @@ public class TestTaskScheduler {
       new TaskSchedulerWithDrainableAppCallback(
         mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
         appUrl, mockRMClient, mockAppContext);
-    TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
+    final TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
         .getDrainableAppCallback();
 
     Configuration conf = new Configuration();
@@ -520,6 +520,7 @@ public class TestTaskScheduler {
     drainableAppCallback.drain();
 
     Object mockTask1 = mock(Object.class);
+    when(mockTask1.toString()).thenReturn("task1");
     Object mockCookie1 = mock(Object.class);
     Resource mockCapability = mock(Resource.class);
     String[] hosts = {"host1", "host5"};
@@ -529,8 +530,10 @@ public class TestTaskScheduler {
     final Priority mockPriority3 = Priority.newInstance(3);
     final Priority mockPriority4 = Priority.newInstance(4);
     Object mockTask2 = mock(Object.class);
+    when(mockTask2.toString()).thenReturn("task2");
     Object mockCookie2 = mock(Object.class);
     Object mockTask3 = mock(Object.class);
+    when(mockTask3.toString()).thenReturn("task3");
     Object mockCookie3 = mock(Object.class);
     ArgumentCaptor<CookieContainerRequest> requestCaptor =
         ArgumentCaptor.forClass(CookieContainerRequest.class);
@@ -558,38 +561,43 @@ public class TestTaskScheduler {
     // sending lower priority container first to make sure its not matched
     Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(mockContainer4.getNodeId().getHost()).thenReturn("host4");
+    when(mockContainer4.toString()).thenReturn("container4");
     when(mockContainer4.getPriority()).thenReturn(mockPriority4);
     ContainerId mockCId4 = mock(ContainerId.class);
     when(mockContainer4.getId()).thenReturn(mockCId4);
+    when(mockCId4.toString()).thenReturn("container4");
     containers.add(mockContainer4);
     Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
     when(mockContainer1.getPriority()).thenReturn(mockPriority1);
+    when(mockContainer1.toString()).thenReturn("container1");
     ContainerId mockCId1 = mock(ContainerId.class);
     when(mockContainer1.getId()).thenReturn(mockCId1);
+    when(mockCId1.toString()).thenReturn("container1");
     containers.add(mockContainer1);
     Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(mockContainer2.getNodeId().getHost()).thenReturn("host2");
     when(mockContainer2.getPriority()).thenReturn(mockPriority2);
+    when(mockContainer2.toString()).thenReturn("container2");
     ContainerId mockCId2 = mock(ContainerId.class);
     when(mockContainer2.getId()).thenReturn(mockCId2);
+    when(mockCId2.toString()).thenReturn("container2");
     containers.add(mockContainer2);
     Container mockContainer3 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(mockContainer3.getNodeId().getHost()).thenReturn("host3");
     when(mockContainer3.getPriority()).thenReturn(mockPriority3);
+    when(mockContainer3.toString()).thenReturn("container3");
     ContainerId mockCId3 = mock(ContainerId.class);
     when(mockContainer3.getId()).thenReturn(mockCId3);
+    when(mockCId3.toString()).thenReturn("container3");
     containers.add(mockContainer3);
 
     ArrayList<CookieContainerRequest> hostContainers =
                              new ArrayList<CookieContainerRequest>();
     hostContainers.add(request1);
-    hostContainers.add(request2);
-    hostContainers.add(request3);
     ArrayList<CookieContainerRequest> rackContainers =
                              new ArrayList<CookieContainerRequest>();
     rackContainers.add(request2);
-    rackContainers.add(request3);
     ArrayList<CookieContainerRequest> anyContainers =
                              new ArrayList<CookieContainerRequest>();
     anyContainers.add(request3);
@@ -605,7 +613,7 @@ public class TestTaskScheduler {
     anyList.add(anyContainers);
     final List<ArrayList<CookieContainerRequest>> emptyList =
                         new LinkedList<ArrayList<CookieContainerRequest>>();
-    // return all requests for host1
+    // return pri1 requests for host1
     when(
         mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
             (Resource) any())).thenAnswer(
@@ -617,7 +625,6 @@ public class TestTaskScheduler {
           }
 
         });
-    // first request matched by host
     // second request matched to rack. RackResolver by default puts hosts in
     // /default-rack. We need to workaround by returning rack matches only once
     when(
@@ -660,19 +667,12 @@ public class TestTaskScheduler {
 
         });
     
-    final AtomicInteger count = new AtomicInteger(0);
-    Mockito.doAnswer(new Answer() {
-      public Object answer(InvocationOnMock invocation) {
-        count.incrementAndGet();
-        return null;
-      }})
-    .when(mockApp).taskAllocated(any(), any(), (Container)any());
     when(mockRMClient.getTopPriority()).then(        
         new Answer<Priority>() {
           @Override
           public Priority answer(
               InvocationOnMock invocation) throws Throwable {
-            int allocations = count.get();
+            int allocations = drainableAppCallback.count.get();
             if (allocations == 0) {
               return mockPriority1;
             }
@@ -751,6 +751,7 @@ public class TestTaskScheduler {
     scheduler.blacklistNode(badNodeId);
     verify(mockRMClient, times(1)).addNodeToBlacklist(badNodeId);
     Object mockTask4 = mock(Object.class);
+    when(mockTask4.toString()).thenReturn("task4");
     Object mockCookie4 = mock(Object.class);
     scheduler.allocateTask(mockTask4, mockCapability, null,
         null, mockPriority4, null, mockCookie4);
@@ -763,6 +764,8 @@ public class TestTaskScheduler {
     when(mockContainer5.getNodeId().getHost()).thenReturn(badHost);
     when(mockContainer5.getNodeId()).thenReturn(badNodeId);
     ContainerId mockCId5 = mock(ContainerId.class);
+    when(mockContainer5.toString()).thenReturn("container5");
+    when(mockCId5.toString()).thenReturn("container5");
     when(mockContainer5.getId()).thenReturn(mockCId5);
     when(mockContainer5.getPriority()).thenReturn(mockPriority4);
     containers.clear();
@@ -804,6 +807,8 @@ public class TestTaskScheduler {
     when(mockContainer6.getNodeId().getHost()).thenReturn("host7");
     ContainerId mockCId6 = mock(ContainerId.class);
     when(mockContainer6.getId()).thenReturn(mockCId6);
+    when(mockContainer6.toString()).thenReturn("container6");
+    when(mockCId6.toString()).thenReturn("container6");
     containers.clear();
     containers.add(mockContainer6);
     when(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f2f31e0e/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 6058f88..f7601a6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -226,7 +227,8 @@ class TestTaskSchedulerHelpers {
     int invocations;
     private TaskSchedulerAppCallback real;
     private CompletionService completionService;
-
+    final AtomicInteger count = new AtomicInteger(0);
+    
     public TaskSchedulerAppCallbackDrainable(TaskSchedulerAppCallbackWrapper real) {
       completionService = real.completionService;
       this.real = real;
@@ -234,6 +236,7 @@ class TestTaskSchedulerHelpers {
 
     @Override
     public void taskAllocated(Object task, Object appCookie, Container container) {
+      count.incrementAndGet();
       invocations++;
       real.taskAllocated(task, appCookie, container);
     }