You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2016/10/27 19:05:08 UTC
hadoop git commit: YARN-5308. FairScheduler: Move continuous
scheduling related tests to TestContinuousScheduling (Kai Sasaki via Varun
Saxena)
Repository: hadoop
Updated Branches:
refs/heads/trunk ac35ee939 -> 79aeddc88
YARN-5308. FairScheduler: Move continuous scheduling related tests to TestContinuousScheduling (Kai Sasaki via Varun Saxena)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/79aeddc8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/79aeddc8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/79aeddc8
Branch: refs/heads/trunk
Commit: 79aeddc88f0e71f0031d5f39fded172de0b29a2e
Parents: ac35ee9
Author: Varun Saxena <va...@apache.org>
Authored: Fri Oct 28 00:29:53 2016 +0530
Committer: Varun Saxena <va...@apache.org>
Committed: Fri Oct 28 00:34:50 2016 +0530
----------------------------------------------------------------------
.../fair/TestContinuousScheduling.java | 189 ++++++++++++++++++-
.../scheduler/fair/TestFairScheduler.java | 157 ---------------
2 files changed, 187 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79aeddc8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
index 6188246..5964d2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
@@ -22,20 +22,32 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
import org.junit.Before;
import org.junit.Test;
@@ -43,18 +55,22 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class TestContinuousScheduling extends FairSchedulerTestBase {
private ControlledClock mockClock;
+ private static int delayThresholdTimeMs = 1000;
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setBoolean(
FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true);
- conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS, 100);
- conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS, 100);
+ conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS,
+ delayThresholdTimeMs);
+ conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS,
+ delayThresholdTimeMs);
return conf;
}
@@ -167,6 +183,175 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
Assert.assertEquals(2, nodes.size());
}
+ @Test
+ public void testWithNodeRemoved() throws Exception {
+ // Disable continuous scheduling, will invoke continuous
+ // scheduling once manually
+ scheduler = new FairScheduler();
+ conf = super.createConfiguration();
+ resourceManager = new MockRM(conf);
+
+ // TODO: This test should really be using MockRM. For now starting stuff
+ // that is needed at a bare minimum.
+ ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
+ resourceManager.getRMContext().getStateStore().start();
+
+ // to initialize the master key
+ resourceManager.getRMContext().getContainerTokenSecretManager()
+ .rollMasterKey();
+
+ scheduler.setRMContext(resourceManager.getRMContext());
+ Assert.assertTrue("Continuous scheduling should be disabled.",
+ !scheduler.isContinuousSchedulingEnabled());
+ scheduler.init(conf);
+ scheduler.start();
+
+ // Add two nodes
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+ RMNode node2 =
+ MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
+ "127.0.0.2");
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+ Assert.assertEquals("We should have two alive nodes.",
+ 2, scheduler.getNumClusterNodes());
+
+ // Remove one node
+ NodeRemovedSchedulerEvent removeNode1
+ = new NodeRemovedSchedulerEvent(node1);
+ scheduler.handle(removeNode1);
+ Assert.assertEquals("We should only have one alive node.",
+ 1, scheduler.getNumClusterNodes());
+
+ // Invoke the continuous scheduling once
+ try {
+ scheduler.continuousSchedulingAttempt();
+ } catch (Exception e) {
+ fail("Exception happened when doing continuous scheduling. " +
+ e.toString());
+ }
+ }
+
+ @Test
+ public void testInterruptedException()
+ throws Exception {
+ // Disable continuous scheduling, will invoke continuous
+ // scheduling once manually
+ scheduler = new FairScheduler();
+ conf = super.createConfiguration();
+ resourceManager = new MockRM(conf);
+
+ // TODO: This test should really be using MockRM. For now starting stuff
+ // that is needed at a bare minimum.
+ ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
+ resourceManager.getRMContext().getStateStore().start();
+
+ // to initialize the master key
+ resourceManager.getRMContext().getContainerTokenSecretManager()
+ .rollMasterKey();
+
+ scheduler.setRMContext(resourceManager.getRMContext());
+ scheduler.init(conf);
+ scheduler.start();
+ FairScheduler spyScheduler = spy(scheduler);
+ Assert.assertTrue("Continuous scheduling should be disabled.",
+ !spyScheduler.isContinuousSchedulingEnabled());
+ // Add one nodes
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ spyScheduler.handle(nodeEvent1);
+ Assert.assertEquals("We should have one alive node.",
+ 1, spyScheduler.getNumClusterNodes());
+ InterruptedException ie = new InterruptedException();
+ doThrow(new YarnRuntimeException(ie)).when(spyScheduler).
+ attemptScheduling(isA(FSSchedulerNode.class));
+ // Invoke the continuous scheduling once
+ try {
+ spyScheduler.continuousSchedulingAttempt();
+ fail("Expected InterruptedException to stop schedulingThread");
+ } catch (InterruptedException e) {
+ Assert.assertEquals(ie, e);
+ }
+ }
+
+ @Test
+ public void testThreadLifeCycle() throws InterruptedException {
+ scheduler.start();
+
+ Thread updateThread = scheduler.updateThread;
+ Thread schedulingThread = scheduler.schedulingThread;
+
+ assertTrue(updateThread.isAlive());
+ assertTrue(schedulingThread.isAlive());
+
+ scheduler.stop();
+
+ int numRetries = 100;
+ while (numRetries-- > 0 &&
+ (updateThread.isAlive() || schedulingThread.isAlive())) {
+ Thread.sleep(50);
+ }
+
+ assertNotEquals("One of the threads is still alive", 0, numRetries);
+ }
+
+ @Test
+ public void testFairSchedulerContinuousSchedulingInitTime() throws Exception {
+ scheduler.start();
+
+ int priorityValue;
+ Priority priority;
+ FSAppAttempt fsAppAttempt;
+ ResourceRequest request1;
+ ResourceRequest request2;
+ ApplicationAttemptId id11;
+
+ priorityValue = 1;
+ id11 = createAppAttemptId(1, 1);
+ createMockRMApp(id11);
+ priority = Priority.newInstance(priorityValue);
+ scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
+ false);
+ scheduler.addApplicationAttempt(id11, false, false);
+ fsAppAttempt = scheduler.getApplicationAttempt(id11);
+
+ String hostName = "127.0.0.1";
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(16 * 1024, 16), 1,
+ hostName);
+ List<ResourceRequest> ask1 = new ArrayList<>();
+ request1 =
+ createResourceRequest(1024, 8, node1.getRackName(), priorityValue, 1,
+ true);
+ request2 =
+ createResourceRequest(1024, 8, ResourceRequest.ANY, priorityValue, 1,
+ true);
+ ask1.add(request1);
+ ask1.add(request2);
+ scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null,
+ null, null);
+
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+ FSSchedulerNode node =
+ (FSSchedulerNode) scheduler.getSchedulerNode(node1.getNodeID());
+ // Tick the time and let the fsApp startTime different from initScheduler
+ // time
+ mockClock.tickSec(delayThresholdTimeMs / 1000);
+ scheduler.attemptScheduling(node);
+ Map<SchedulerRequestKey, Long> lastScheduledContainer =
+ fsAppAttempt.getLastScheduledContainer();
+ long initSchedulerTime =
+ lastScheduledContainer.get(TestUtils.toSchedulerKey(priority));
+ assertEquals(delayThresholdTimeMs, initSchedulerTime);
+ }
+
private void triggerSchedulingAttempt() throws InterruptedException {
Thread.sleep(
2 * scheduler.getConf().getContinuousSchedulingSleepMs());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79aeddc8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 9e0dd06..e28b35a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -25,10 +25,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.File;
@@ -62,7 +59,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -99,12 +95,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
@@ -4125,71 +4118,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
@Test
- public void testContinuousSchedulingWithNodeRemoved() throws Exception {
- // Disable continuous scheduling, will invoke continuous scheduling once manually
- scheduler.init(conf);
- scheduler.start();
- Assert.assertTrue("Continuous scheduling should be disabled.",
- !scheduler.isContinuousSchedulingEnabled());
-
- // Add two nodes
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
- RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
- "127.0.0.2");
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
- scheduler.handle(nodeEvent2);
- Assert.assertEquals("We should have two alive nodes.",
- 2, scheduler.getNumClusterNodes());
-
- // Remove one node
- NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1);
- scheduler.handle(removeNode1);
- Assert.assertEquals("We should only have one alive node.",
- 1, scheduler.getNumClusterNodes());
-
- // Invoke the continuous scheduling once
- try {
- scheduler.continuousSchedulingAttempt();
- } catch (Exception e) {
- fail("Exception happened when doing continuous scheduling. " +
- e.toString());
- }
- }
-
- @Test
- public void testContinuousSchedulingInterruptedException()
- throws Exception {
- scheduler.init(conf);
- scheduler.start();
- FairScheduler spyScheduler = spy(scheduler);
- Assert.assertTrue("Continuous scheduling should be disabled.",
- !spyScheduler.isContinuousSchedulingEnabled());
- // Add one nodes
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- spyScheduler.handle(nodeEvent1);
- Assert.assertEquals("We should have one alive node.",
- 1, spyScheduler.getNumClusterNodes());
- InterruptedException ie = new InterruptedException();
- doThrow(new YarnRuntimeException(ie)).when(spyScheduler).
- attemptScheduling(isA(FSSchedulerNode.class));
- // Invoke the continuous scheduling once
- try {
- spyScheduler.continuousSchedulingAttempt();
- fail("Expected InterruptedException to stop schedulingThread");
- } catch (InterruptedException e) {
- Assert.assertEquals(ie, e);
- }
- }
-
- @Test
public void testSchedulingOnRemovedNode() throws Exception {
// Disable continuous scheduling, will invoke continuous scheduling manually
scheduler.init(conf);
@@ -4487,30 +4415,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
@Test
- public void testThreadLifeCycle() throws InterruptedException {
- conf.setBoolean(
- FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true);
- scheduler.init(conf);
- scheduler.start();
-
- Thread updateThread = scheduler.updateThread;
- Thread schedulingThread = scheduler.schedulingThread;
-
- assertTrue(updateThread.isAlive());
- assertTrue(schedulingThread.isAlive());
-
- scheduler.stop();
-
- int numRetries = 100;
- while (numRetries-- > 0 &&
- (updateThread.isAlive() || schedulingThread.isAlive())) {
- Thread.sleep(50);
- }
-
- assertNotEquals("One of the threads is still alive", 0, numRetries);
- }
-
- @Test
public void testPerfMetricsInited() {
scheduler.init(conf);
scheduler.start();
@@ -4645,67 +4549,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
@Test
- public void testFairSchedulerContinuousSchedulingInitTime() throws Exception {
- int DELAY_THRESHOLD_TIME_MS = 1000;
- conf.set(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, "true");
- conf.set(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS,
- String.valueOf(DELAY_THRESHOLD_TIME_MS));
- conf.set(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS,
- String.valueOf(DELAY_THRESHOLD_TIME_MS));
-
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
- scheduler.init(conf);
- scheduler.start();
-
- int priorityValue;
- Priority priority;
- FSAppAttempt fsAppAttempt;
- ResourceRequest request1;
- ResourceRequest request2;
- ApplicationAttemptId id11;
-
- priorityValue = 1;
- id11 = createAppAttemptId(1, 1);
- createMockRMApp(id11);
- priority = Priority.newInstance(priorityValue);
- scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
- false);
- scheduler.addApplicationAttempt(id11, false, false);
- fsAppAttempt = scheduler.getApplicationAttempt(id11);
-
- String hostName = "127.0.0.1";
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(16 * 1024, 16), 1,
- hostName);
- List<ResourceRequest> ask1 = new ArrayList<>();
- request1 =
- createResourceRequest(1024, 8, node1.getRackName(), priorityValue, 1,
- true);
- request2 =
- createResourceRequest(1024, 8, ResourceRequest.ANY, priorityValue, 1,
- true);
- ask1.add(request1);
- ask1.add(request2);
- scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null,
- null, null);
-
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
- FSSchedulerNode node =
- (FSSchedulerNode) scheduler.getSchedulerNode(node1.getNodeID());
- // Tick the time and let the fsApp startTime different from initScheduler
- // time
- clock.tickSec(DELAY_THRESHOLD_TIME_MS / 1000);
- scheduler.attemptScheduling(node);
- Map<SchedulerRequestKey, Long> lastScheduledContainer =
- fsAppAttempt.getLastScheduledContainer();
- long initSchedulerTime =
- lastScheduledContainer.get(TestUtils.toSchedulerKey(priority));
- assertEquals(DELAY_THRESHOLD_TIME_MS, initSchedulerTime);
- }
-
- @Test
public void testResourceUpdateDecommissioningNode() throws Exception {
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode
// to have 0 available resource
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org