You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/26 04:17:49 UTC
git commit: TEZ-891. TestTaskScheduler does not handle mockApp being
called on different thread (bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master a5713a41f -> f2f31e0ef
TEZ-891. TestTaskScheduler does not handle mockApp being called on different thread (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f2f31e0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f2f31e0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f2f31e0e
Branch: refs/heads/master
Commit: f2f31e0eff3f654d51ca7001a3bf1784cf8d5e0b
Parents: a5713a4
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Feb 25 19:17:36 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Feb 25 19:17:36 2014 -0800
----------------------------------------------------------------------
.../apache/tez/dag/app/rm/TaskScheduler.java | 3 +-
.../tez/dag/app/rm/TestTaskScheduler.java | 33 +++++++++++---------
.../dag/app/rm/TestTaskSchedulerHelpers.java | 5 ++-
3 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f2f31e0e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 775d342..c50e3a4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -68,7 +68,6 @@ import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/* TODO not yet updating cluster nodes on every allocate response
@@ -787,12 +786,14 @@ public class TaskScheduler extends AbstractService
}
public synchronized void blacklistNode(NodeId nodeId) {
+ LOG.info("Blacklisting node: " + nodeId);
amRmClient.addNodeToBlacklist(nodeId);
blacklistedNodes.add(nodeId);
}
public synchronized void unblacklistNode(NodeId nodeId) {
if (blacklistedNodes.remove(nodeId)) {
+ LOG.info("UnBlacklisting node: " + nodeId);
amRmClient.removeNodeFromBlacklist(nodeId);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f2f31e0e/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 150ef7a..63cc476 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -491,7 +491,7 @@ public class TestTaskScheduler {
new TaskSchedulerWithDrainableAppCallback(
mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
appUrl, mockRMClient, mockAppContext);
- TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
+ final TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
.getDrainableAppCallback();
Configuration conf = new Configuration();
@@ -520,6 +520,7 @@ public class TestTaskScheduler {
drainableAppCallback.drain();
Object mockTask1 = mock(Object.class);
+ when(mockTask1.toString()).thenReturn("task1");
Object mockCookie1 = mock(Object.class);
Resource mockCapability = mock(Resource.class);
String[] hosts = {"host1", "host5"};
@@ -529,8 +530,10 @@ public class TestTaskScheduler {
final Priority mockPriority3 = Priority.newInstance(3);
final Priority mockPriority4 = Priority.newInstance(4);
Object mockTask2 = mock(Object.class);
+ when(mockTask2.toString()).thenReturn("task2");
Object mockCookie2 = mock(Object.class);
Object mockTask3 = mock(Object.class);
+ when(mockTask3.toString()).thenReturn("task3");
Object mockCookie3 = mock(Object.class);
ArgumentCaptor<CookieContainerRequest> requestCaptor =
ArgumentCaptor.forClass(CookieContainerRequest.class);
@@ -558,38 +561,43 @@ public class TestTaskScheduler {
// sending lower priority container first to make sure its not matched
Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer4.getNodeId().getHost()).thenReturn("host4");
+ when(mockContainer4.toString()).thenReturn("container4");
when(mockContainer4.getPriority()).thenReturn(mockPriority4);
ContainerId mockCId4 = mock(ContainerId.class);
when(mockContainer4.getId()).thenReturn(mockCId4);
+ when(mockCId4.toString()).thenReturn("container4");
containers.add(mockContainer4);
Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
when(mockContainer1.getPriority()).thenReturn(mockPriority1);
+ when(mockContainer1.toString()).thenReturn("container1");
ContainerId mockCId1 = mock(ContainerId.class);
when(mockContainer1.getId()).thenReturn(mockCId1);
+ when(mockCId1.toString()).thenReturn("container1");
containers.add(mockContainer1);
Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer2.getNodeId().getHost()).thenReturn("host2");
when(mockContainer2.getPriority()).thenReturn(mockPriority2);
+ when(mockContainer2.toString()).thenReturn("container2");
ContainerId mockCId2 = mock(ContainerId.class);
when(mockContainer2.getId()).thenReturn(mockCId2);
+ when(mockCId2.toString()).thenReturn("container2");
containers.add(mockContainer2);
Container mockContainer3 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer3.getNodeId().getHost()).thenReturn("host3");
when(mockContainer3.getPriority()).thenReturn(mockPriority3);
+ when(mockContainer3.toString()).thenReturn("container3");
ContainerId mockCId3 = mock(ContainerId.class);
when(mockContainer3.getId()).thenReturn(mockCId3);
+ when(mockCId3.toString()).thenReturn("container3");
containers.add(mockContainer3);
ArrayList<CookieContainerRequest> hostContainers =
new ArrayList<CookieContainerRequest>();
hostContainers.add(request1);
- hostContainers.add(request2);
- hostContainers.add(request3);
ArrayList<CookieContainerRequest> rackContainers =
new ArrayList<CookieContainerRequest>();
rackContainers.add(request2);
- rackContainers.add(request3);
ArrayList<CookieContainerRequest> anyContainers =
new ArrayList<CookieContainerRequest>();
anyContainers.add(request3);
@@ -605,7 +613,7 @@ public class TestTaskScheduler {
anyList.add(anyContainers);
final List<ArrayList<CookieContainerRequest>> emptyList =
new LinkedList<ArrayList<CookieContainerRequest>>();
- // return all requests for host1
+ // return pri1 requests for host1
when(
mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
(Resource) any())).thenAnswer(
@@ -617,7 +625,6 @@ public class TestTaskScheduler {
}
});
- // first request matched by host
// second request matched to rack. RackResolver by default puts hosts in
// /default-rack. We need to workaround by returning rack matches only once
when(
@@ -660,19 +667,12 @@ public class TestTaskScheduler {
});
- final AtomicInteger count = new AtomicInteger(0);
- Mockito.doAnswer(new Answer() {
- public Object answer(InvocationOnMock invocation) {
- count.incrementAndGet();
- return null;
- }})
- .when(mockApp).taskAllocated(any(), any(), (Container)any());
when(mockRMClient.getTopPriority()).then(
new Answer<Priority>() {
@Override
public Priority answer(
InvocationOnMock invocation) throws Throwable {
- int allocations = count.get();
+ int allocations = drainableAppCallback.count.get();
if (allocations == 0) {
return mockPriority1;
}
@@ -751,6 +751,7 @@ public class TestTaskScheduler {
scheduler.blacklistNode(badNodeId);
verify(mockRMClient, times(1)).addNodeToBlacklist(badNodeId);
Object mockTask4 = mock(Object.class);
+ when(mockTask4.toString()).thenReturn("task4");
Object mockCookie4 = mock(Object.class);
scheduler.allocateTask(mockTask4, mockCapability, null,
null, mockPriority4, null, mockCookie4);
@@ -763,6 +764,8 @@ public class TestTaskScheduler {
when(mockContainer5.getNodeId().getHost()).thenReturn(badHost);
when(mockContainer5.getNodeId()).thenReturn(badNodeId);
ContainerId mockCId5 = mock(ContainerId.class);
+ when(mockContainer5.toString()).thenReturn("container5");
+ when(mockCId5.toString()).thenReturn("container5");
when(mockContainer5.getId()).thenReturn(mockCId5);
when(mockContainer5.getPriority()).thenReturn(mockPriority4);
containers.clear();
@@ -804,6 +807,8 @@ public class TestTaskScheduler {
when(mockContainer6.getNodeId().getHost()).thenReturn("host7");
ContainerId mockCId6 = mock(ContainerId.class);
when(mockContainer6.getId()).thenReturn(mockCId6);
+ when(mockContainer6.toString()).thenReturn("container6");
+ when(mockCId6.toString()).thenReturn("container6");
containers.clear();
containers.add(mockContainer6);
when(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f2f31e0e/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 6058f88..f7601a6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -226,7 +227,8 @@ class TestTaskSchedulerHelpers {
int invocations;
private TaskSchedulerAppCallback real;
private CompletionService completionService;
-
+ final AtomicInteger count = new AtomicInteger(0);
+
public TaskSchedulerAppCallbackDrainable(TaskSchedulerAppCallbackWrapper real) {
completionService = real.completionService;
this.real = real;
@@ -234,6 +236,7 @@ class TestTaskSchedulerHelpers {
@Override
public void taskAllocated(Object task, Object appCookie, Container container) {
+ count.incrementAndGet();
invocations++;
real.taskAllocated(task, appCookie, container);
}