You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/08/03 13:57:27 UTC
hadoop git commit: YARN-6678. Handle IllegalStateException in Async
Scheduling mode of CapacityScheduler. Contributed by Tao Yang.
Repository: hadoop
Updated Branches:
refs/heads/trunk 79df1e750 -> f64cfeaf6
YARN-6678. Handle IllegalStateException in Async Scheduling mode of CapacityScheduler. Contributed by Tao Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f64cfeaf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f64cfeaf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f64cfeaf
Branch: refs/heads/trunk
Commit: f64cfeaf61ec65a465decdd8215f567d4e6677a9
Parents: 79df1e7
Author: Sunil G <su...@apache.org>
Authored: Thu Aug 3 19:27:10 2017 +0530
Committer: Sunil G <su...@apache.org>
Committed: Thu Aug 3 19:27:10 2017 +0530
----------------------------------------------------------------------
.../scheduler/common/fica/FiCaSchedulerApp.java | 13 ++
.../TestCapacitySchedulerAsyncScheduling.java | 147 +++++++++++++++++++
2 files changed, 160 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f64cfeaf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index ad4c8ce..17bb104 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -426,6 +426,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// accepted & confirmed, it will become RESERVED state
if (schedulerContainer.getRmContainer().getState()
== RMContainerState.RESERVED) {
+ // Check if node currently reserved by other application, there may
+ // be some outdated proposals in async-scheduling environment
+ if (schedulerContainer.getRmContainer() != schedulerContainer
+ .getSchedulerNode().getReservedContainer()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Try to re-reserve a container, but node "
+ + schedulerContainer.getSchedulerNode()
+ + " is already reserved by another container"
+ + schedulerContainer.getSchedulerNode()
+ .getReservedContainer().getContainerId());
+ }
+ return false;
+ }
// Set reReservation == true
reReservation = true;
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f64cfeaf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index 0eb89d7..0c3130d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -20,7 +20,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -41,20 +44,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
public class TestCapacitySchedulerAsyncScheduling {
private final int GB = 1024;
@@ -257,6 +266,144 @@ public class TestCapacitySchedulerAsyncScheduling {
rm.stop();
}
+ // Testcase for YARN-6678
+ @Test(timeout = 30000)
+ public void testCommitOutdatedReservedProposal() throws Exception {
+ // disable async-scheduling for simulating complex since scene
+ Configuration disableAsyncConf = new Configuration(conf);
+ disableAsyncConf.setBoolean(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
+
+ // init RM & NMs & Nodes
+ final MockRM rm = new MockRM(disableAsyncConf);
+ rm.start();
+ final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
+ final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
+
+ // init scheduler nodes
+ int waitTime = 1000;
+ while (waitTime > 0 &&
+ ((AbstractYarnScheduler) rm.getRMContext().getScheduler())
+ .getNodeTracker().nodeCount() < 2) {
+ waitTime -= 10;
+ Thread.sleep(10);
+ }
+ Assert.assertEquals(2,
+ ((AbstractYarnScheduler) rm.getRMContext().getScheduler())
+ .getNodeTracker().nodeCount());
+
+ YarnScheduler scheduler = rm.getRMContext().getScheduler();
+ final SchedulerNode sn1 =
+ ((CapacityScheduler) scheduler).getSchedulerNode(nm1.getNodeId());
+ final SchedulerNode sn2 =
+ ((CapacityScheduler) scheduler).getSchedulerNode(nm2.getNodeId());
+
+ // submit app1, am1 is running on nm1
+ RMApp app = rm.submitApp(200, "app", "user", null, "default");
+ final MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+ // submit app2, am2 is running on nm1
+ RMApp app2 = rm.submitApp(200, "app", "user", null, "default");
+ final MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+
+ // allocate and launch 2 containers for app1
+ allocateAndLaunchContainers(am, nm1, rm, 1,
+ Resources.createResource(5 * GB), 0, 2);
+ allocateAndLaunchContainers(am, nm2, rm, 1,
+ Resources.createResource(5 * GB), 0, 3);
+
+ // nm1 runs 3 containers(app1-container_01/AM, app1-container_02,
+ // app2-container_01/AM)
+ // nm2 runs 1 container(app1-container_03)
+ Assert.assertEquals(3, sn1.getNumContainers());
+ Assert.assertEquals(1, sn2.getNumContainers());
+
+ // reserve 1 container(app1-container_04) for app1 on nm1
+ ResourceRequest rr2 = ResourceRequest
+ .newInstance(Priority.newInstance(0), "*",
+ Resources.createResource(5 * GB), 1);
+ am.allocate(Arrays.asList(rr2), null);
+ nm1.nodeHeartbeat(true);
+ // wait app1-container_04 reserved on nm1
+ waitTime = 1000;
+ while (waitTime > 0 && sn1.getReservedContainer() == null) {
+ waitTime -= 10;
+ Thread.sleep(10);
+ }
+ Assert.assertNotNull(sn1.getReservedContainer());
+
+ final CapacityScheduler cs = (CapacityScheduler) scheduler;
+ final CapacityScheduler spyCs = Mockito.spy(cs);
+ final AtomicBoolean isFirstReserve = new AtomicBoolean(true);
+ final AtomicBoolean isChecked = new AtomicBoolean(false);
+ // handle CapacityScheduler#tryCommit,
+ // reproduce the process that can raise IllegalStateException before
+ Mockito.doAnswer(new Answer<Object>() {
+ public Object answer(InvocationOnMock invocation) throws Exception {
+ ResourceCommitRequest request =
+ (ResourceCommitRequest) invocation.getArguments()[1];
+ if (request.getContainersToReserve().size() > 0 && isFirstReserve
+ .compareAndSet(true, false)) {
+ // release app1-container_03 on nm2
+ RMContainer killableContainer =
+ sn2.getCopiedListOfRunningContainers().get(0);
+ cs.completedContainer(killableContainer, ContainerStatus
+ .newInstance(killableContainer.getContainerId(),
+ ContainerState.COMPLETE, "",
+ ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
+ RMContainerEventType.KILL);
+ Assert.assertEquals(0, sn2.getCopiedListOfRunningContainers().size());
+ // unreserve app1-container_04 on nm1
+ // and allocate app1-container_05 on nm2
+ cs.handle(new NodeUpdateSchedulerEvent(sn2.getRMNode()));
+ int waitTime = 1000;
+ while (waitTime > 0
+ && sn2.getCopiedListOfRunningContainers().size() == 0) {
+ waitTime -= 10;
+ Thread.sleep(10);
+ }
+ Assert.assertEquals(1, sn2.getCopiedListOfRunningContainers().size());
+ Assert.assertNull(sn1.getReservedContainer());
+
+ // reserve app2-container_02 on nm1
+ ResourceRequest rr3 = ResourceRequest
+ .newInstance(Priority.newInstance(0), "*",
+ Resources.createResource(5 * GB), 1);
+ am2.allocate(Arrays.asList(rr3), null);
+ cs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
+ waitTime = 1000;
+ while (waitTime > 0 && sn1.getReservedContainer() == null) {
+ waitTime -= 10;
+ Thread.sleep(10);
+ }
+ Assert.assertNotNull(sn1.getReservedContainer());
+
+ // call real apply
+ try {
+ cs.tryCommit((Resource) invocation.getArguments()[0],
+ (ResourceCommitRequest) invocation.getArguments()[1]);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ isChecked.set(true);
+ } else {
+ cs.tryCommit((Resource) invocation.getArguments()[0],
+ (ResourceCommitRequest) invocation.getArguments()[1]);
+ }
+ return null;
+ }
+ }).when(spyCs).tryCommit(Mockito.any(Resource.class),
+ Mockito.any(ResourceCommitRequest.class));
+
+ spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
+
+ waitTime = 1000;
+ while (waitTime > 0 && !isChecked.get()) {
+ waitTime -= 10;
+ Thread.sleep(10);
+ }
+ rm.stop();
+ }
private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
int nContainer, Resource resource, int priority, int startContainerId)
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org