You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2017/07/11 16:25:04 UTC
[18/50] [abbrv] hadoop git commit: YARN-6714. IllegalStateException
while handling APP_ATTEMPT_REMOVED event when async-scheduling enabled in
CapacityScheduler. Contributed by Tao Yang.
YARN-6714. IllegalStateException while handling APP_ATTEMPT_REMOVED event when async-scheduling enabled in CapacityScheduler. Contributed by Tao Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/34f113df
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/34f113df
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/34f113df
Branch: refs/heads/HDFS-10285
Commit: 34f113df5cff2cc330fb671296932b8227b11975
Parents: fce7951
Author: Sunil G <su...@apache.org>
Authored: Tue Jul 11 14:52:44 2017 +0530
Committer: Sunil G <su...@apache.org>
Committed: Tue Jul 11 14:52:44 2017 +0530
----------------------------------------------------------------------
.../scheduler/capacity/CapacityScheduler.java | 5 +-
.../TestCapacitySchedulerAsyncScheduling.java | 149 +++++++++++++++++++
2 files changed, 153 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34f113df/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index d3186da..0d72860 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2392,7 +2392,10 @@ public class CapacityScheduler extends
if (attemptId != null) {
FiCaSchedulerApp app = getApplicationAttempt(attemptId);
- if (app != null) {
+ // Required sanity check for attemptId - when async-scheduling enabled,
+ // proposal might be outdated if AM failover just finished
+ // and proposal queue was not be consumed in time
+ if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
if (app.accept(cluster, request)) {
app.apply(cluster, request);
LOG.info("Allocation proposal accepted");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34f113df/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index 9854a15..0eb89d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -18,6 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -25,12 +31,29 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
public class TestCapacitySchedulerAsyncScheduling {
@@ -140,4 +163,130 @@ public class TestCapacitySchedulerAsyncScheduling {
rm.close();
}
+
+ // Testcase for YARN-6714
+ @Test (timeout = 30000)
+ public void testCommitProposalForFailedAppAttempt()
+ throws Exception {
+ // disable async-scheduling for simulating complex since scene
+ Configuration disableAsyncConf = new Configuration(conf);
+ disableAsyncConf.setBoolean(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
+
+ // init RM & NMs & Nodes
+ final MockRM rm = new MockRM(disableAsyncConf);
+ rm.start();
+ final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
+ final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
+ List<MockNM> nmLst = new ArrayList<>();
+ nmLst.add(nm1);
+ nmLst.add(nm2);
+
+ // init scheduler & nodes
+ while (
+ ((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker()
+ .nodeCount() < 2) {
+ Thread.sleep(10);
+ }
+ Assert.assertEquals(2,
+ ((AbstractYarnScheduler) rm.getRMContext().getScheduler())
+ .getNodeTracker().nodeCount());
+ CapacityScheduler scheduler =
+ (CapacityScheduler) rm.getRMContext().getScheduler();
+ SchedulerNode sn1 = scheduler.getSchedulerNode(nm1.getNodeId());
+ SchedulerNode sn2 = scheduler.getSchedulerNode(nm2.getNodeId());
+
+ // launch app
+ RMApp app = rm.submitApp(200, "app", "user", null, false, "default",
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
+ MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+ FiCaSchedulerApp schedulerApp =
+ scheduler.getApplicationAttempt(am.getApplicationAttemptId());
+
+ // allocate and launch 1 containers and running on nm2
+ allocateAndLaunchContainers(am, nm2, rm, 1,
+ Resources.createResource(5 * GB), 0, 2);
+
+ // nm1 runs 1 container(app1-container_01/AM)
+ // nm2 runs 1 container(app1-container_02)
+ Assert.assertEquals(1, sn1.getNumContainers());
+ Assert.assertEquals(1, sn2.getNumContainers());
+
+ // kill app attempt1
+ scheduler.handle(
+ new AppAttemptRemovedSchedulerEvent(am.getApplicationAttemptId(),
+ RMAppAttemptState.KILLED, true));
+ // wait until app attempt1 removed on nm1
+ while (sn1.getCopiedListOfRunningContainers().size() == 1) {
+ Thread.sleep(100);
+ }
+ // wait until app attempt2 launched on nm1
+ while (sn1.getCopiedListOfRunningContainers().size() == 0) {
+ nm1.nodeHeartbeat(true);
+ Thread.sleep(100);
+ }
+
+ // generate reserved proposal of stopped app attempt
+ // and it could be committed for async-scheduling
+ // this kind of proposal should be skipped
+ Resource reservedResource = Resources.createResource(5 * GB);
+ Container container = Container.newInstance(
+ ContainerId.newContainerId(am.getApplicationAttemptId(), 3),
+ sn2.getNodeID(), sn2.getHttpAddress(), reservedResource,
+ Priority.newInstance(0), null);
+ RMContainer rmContainer = new RMContainerImpl(container, SchedulerRequestKey
+ .create(ResourceRequest
+ .newInstance(Priority.newInstance(0), "*", reservedResource, 1)),
+ am.getApplicationAttemptId(), sn2.getNodeID(), "user",
+ rm.getRMContext());
+ SchedulerContainer reservedContainer =
+ new SchedulerContainer(schedulerApp, scheduler.getNode(sn2.getNodeID()),
+ rmContainer, "", false);
+ ContainerAllocationProposal reservedForAttempt1Proposal =
+ new ContainerAllocationProposal(reservedContainer, null,
+ reservedContainer, NodeType.OFF_SWITCH, NodeType.OFF_SWITCH,
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, reservedResource);
+ List<ContainerAllocationProposal> reservedProposals = new ArrayList<>();
+ reservedProposals.add(reservedForAttempt1Proposal);
+ ResourceCommitRequest request =
+ new ResourceCommitRequest(null, reservedProposals, null);
+ scheduler.tryCommit(scheduler.getClusterResource(), request);
+ Assert.assertNull("Outdated proposal should not be accepted!",
+ sn2.getReservedContainer());
+
+ rm.stop();
+ }
+
+
+ private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
+ int nContainer, Resource resource, int priority, int startContainerId)
+ throws Exception {
+ am.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(priority), "*", resource,
+ nContainer)), null);
+ ContainerId lastContainerId = ContainerId
+ .newContainerId(am.getApplicationAttemptId(),
+ startContainerId + nContainer - 1);
+ Assert.assertTrue(
+ rm.waitForState(nm, lastContainerId, RMContainerState.ALLOCATED));
+ // Acquire them, and NM report RUNNING
+ am.allocate(null, null);
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ for (int cId = startContainerId;
+ cId < startContainerId + nContainer; cId++) {
+ ContainerId containerId =
+ ContainerId.newContainerId(am.getApplicationAttemptId(), cId);
+ RMContainer rmContainer = cs.getRMContainer(containerId);
+ if (rmContainer != null) {
+ rmContainer.handle(
+ new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
+ } else {
+ Assert.fail("Cannot find RMContainer");
+ }
+ rm.waitForState(nm,
+ ContainerId.newContainerId(am.getApplicationAttemptId(), cId),
+ RMContainerState.RUNNING);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org