You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by eh...@apache.org on 2018/08/10 11:35:42 UTC
[48/50] [abbrv] hadoop git commit: YARN-8575. Avoid committing
allocation proposal to unavailable nodes in async scheduling. Contributed by
Tao Yang.
YARN-8575. Avoid committing allocation proposal to unavailable nodes in async scheduling. Contributed by Tao Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0a71bf14
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0a71bf14
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0a71bf14
Branch: refs/heads/HDFS-12090
Commit: 0a71bf145293adbd3728525ab4c36c08d51377d3
Parents: 08d5060
Author: Weiwei Yang <ww...@apache.org>
Authored: Fri Aug 10 14:37:45 2018 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Fri Aug 10 14:37:45 2018 +0800
----------------------------------------------------------------------
.../scheduler/common/fica/FiCaSchedulerApp.java | 12 ++++
.../yarn/server/resourcemanager/MockNodes.java | 6 +-
.../resourcemanager/TestResourceManager.java | 16 ++++-
.../TestCapacitySchedulerAsyncScheduling.java | 69 ++++++++++++++++++++
.../scheduler/capacity/TestUtils.java | 2 +
5 files changed, 100 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a71bf14/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 9810e98..6a5af81 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -429,6 +430,17 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
schedulerContainer = allocation.getAllocatedOrReservedContainer();
+ // Make sure node is in RUNNING state
+ if (schedulerContainer.getSchedulerNode().getRMNode().getState()
+ != NodeState.RUNNING) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to accept this proposal because node "
+ + schedulerContainer.getSchedulerNode().getNodeID() + " is in "
+ + schedulerContainer.getSchedulerNode().getRMNode().getState()
+ + " state (not RUNNING)");
+ }
+ return false;
+ }
if (schedulerContainer.isAllocated()) {
// When allocate a new container
containerRequest =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a71bf14/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 9041132..c444b6e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -347,17 +347,17 @@ public class MockNodes {
}
public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) {
- return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null, 123);
+ return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, null, 123);
}
public static RMNode newNodeInfo(int rack, final Resource perNode,
int hostnum, String hostName) {
- return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, 123);
+ return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, hostName, 123);
}
public static RMNode newNodeInfo(int rack, final Resource perNode,
int hostnum, String hostName, int port) {
- return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, port);
+ return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, hostName, port);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a71bf14/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
index 941e477..a66c583 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collection;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,6 +40,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -86,8 +89,9 @@ public class TestResourceManager {
}
@Test
- public void testResourceAllocation() throws IOException,
- YarnException, InterruptedException {
+ public void testResourceAllocation()
+ throws IOException, YarnException, InterruptedException,
+ TimeoutException {
LOG.info("--- START: testResourceAllocation ---");
final int memory = 4 * 1024;
@@ -105,6 +109,14 @@ public class TestResourceManager {
registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(memory/2, vcores/2));
+ // nodes should be in RUNNING state
+ RMNodeImpl node1 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get(
+ nm1.getNodeId());
+ RMNodeImpl node2 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get(
+ nm2.getNodeId());
+ node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null));
+ node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null));
+
// Submit an application
Application application = new Application("user1", resourceManager);
application.submit();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a71bf14/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index c2c1519..840d30d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -43,6 +45,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -745,6 +749,71 @@ public class TestCapacitySchedulerAsyncScheduling {
rm1.close();
}
+ @Test(timeout = 30000)
+ public void testCommitProposalsForUnusableNode() throws Exception {
+ // disable async-scheduling for simulating complex scene
+ Configuration disableAsyncConf = new Configuration(conf);
+ disableAsyncConf.setBoolean(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
+
+ // init RM & NMs
+ final MockRM rm = new MockRM(disableAsyncConf);
+ rm.start();
+ final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB);
+ final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 8 * GB);
+ final MockNM nm3 = rm.registerNode("192.168.0.3:2234", 8 * GB);
+ rm.drainEvents();
+ CapacityScheduler cs =
+ (CapacityScheduler) rm.getRMContext().getScheduler();
+ SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
+
+ // launch app1-am on nm1
+ RMApp app1 = rm.submitApp(1 * GB, "app1", "user", null, false, "default",
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+ // launch app2-am on nm2
+ RMApp app2 = rm.submitApp(1 * GB, "app2", "user", null, false, "default",
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+ // app2 asks 1 * 8G container
+ am2.allocate(ImmutableList.of(ResourceRequest
+ .newInstance(Priority.newInstance(0), "*",
+ Resources.createResource(8 * GB), 1)), null);
+
+ List<Object> reservedProposalParts = new ArrayList<>();
+ final CapacityScheduler spyCs = Mockito.spy(cs);
+ // handle CapacityScheduler#tryCommit
+ Mockito.doAnswer(new Answer<Object>() {
+ public Boolean answer(InvocationOnMock invocation) throws Exception {
+ for (Object argument : invocation.getArguments()) {
+ reservedProposalParts.add(argument);
+ }
+ return false;
+ }
+ }).when(spyCs).tryCommit(Mockito.any(Resource.class),
+ Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());
+
+ spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
+
+ // decommission nm1
+ RMNode rmNode1 = cs.getNode(nm1.getNodeId()).getRMNode();
+ cs.getRMContext().getDispatcher().getEventHandler().handle(
+ new RMNodeEvent(nm1.getNodeId(), RMNodeEventType.DECOMMISSION));
+ rm.drainEvents();
+ Assert.assertEquals(NodeState.DECOMMISSIONED, rmNode1.getState());
+ Assert.assertNull(cs.getNode(nm1.getNodeId()));
+
+ // try commit after nm1 decommissioned
+ boolean isSuccess =
+ cs.tryCommit((Resource) reservedProposalParts.get(0),
+ (ResourceCommitRequest) reservedProposalParts.get(1),
+ (Boolean) reservedProposalParts.get(2));
+ Assert.assertFalse(isSuccess);
+ rm.stop();
+ }
+
private ResourceCommitRequest createAllocateFromReservedProposal(
int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp,
SchedulerNode allocateNode, SchedulerNode reservedNode,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a71bf14/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index fae63be..b13790d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
@@ -220,6 +221,7 @@ public class TestUtils {
when(rmNode.getNodeAddress()).thenReturn(host+":"+port);
when(rmNode.getHostName()).thenReturn(host);
when(rmNode.getRackName()).thenReturn(rack);
+ when(rmNode.getState()).thenReturn(NodeState.RUNNING);
FiCaSchedulerNode node = spy(new FiCaSchedulerNode(rmNode, false));
LOG.info("node = " + host + " avail=" + node.getUnallocatedResource());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org