You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2018/07/27 00:12:40 UTC
[17/50] hadoop git commit: YARN-8511. When AM releases a container,
RM removes allocation tags before it is released by NM. (Weiwei Yang
via wangda)
YARN-8511. When AM releases a container, RM removes allocation tags before it is released by NM. (Weiwei Yang via wangda)
Change-Id: I6f9f409f2ef685b405cbff547dea9623bf3322d9
(cherry picked from commit 752dcce5f4cf0f6ebcb40a61f622f1a885c4bda7)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/44beab0b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/44beab0b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/44beab0b
Branch: refs/remotes/origin/branch-3.1
Commit: 44beab0b63cb853f2f711d9592fcd947241d112e
Parents: ac9155d
Author: Wangda Tan <wa...@apache.org>
Authored: Mon Jul 16 10:54:41 2018 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Mon Jul 16 11:04:08 2018 -0700
----------------------------------------------------------------------
.../hadoop/yarn/sls/nodemanager/NodeInfo.java | 6 ++
.../yarn/sls/scheduler/RMNodeWrapper.java | 6 ++
.../rmcontainer/RMContainerImpl.java | 5 -
.../server/resourcemanager/rmnode/RMNode.java | 6 ++
.../resourcemanager/rmnode/RMNodeImpl.java | 5 +
.../scheduler/SchedulerNode.java | 15 +++
.../yarn/server/resourcemanager/MockNodes.java | 5 +
.../rmcontainer/TestRMContainerImpl.java | 16 ++-
.../scheduler/TestAbstractYarnScheduler.java | 104 +++++++++++++++++++
9 files changed, 162 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44beab0b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 0c99139..69946c8 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -219,6 +220,11 @@ public class NodeInfo {
}
@Override
+ public RMContext getRMContext() {
+ return null;
+ }
+
+ @Override
public Resource getPhysicalResource() {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44beab0b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 78645e9..a96b790 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -207,6 +208,11 @@ public class RMNodeWrapper implements RMNode {
}
@Override
+ public RMContext getRMContext() {
+ return node.getRMContext();
+ }
+
+ @Override
public Resource getPhysicalResource() {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44beab0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index b5c8e7c..efac666 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -701,11 +701,6 @@ public class RMContainerImpl implements RMContainer {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
- // Notify AllocationTagsManager
- container.rmContext.getAllocationTagsManager().removeContainer(
- container.getNodeId(), container.getContainerId(),
- container.getAllocationTags());
-
RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
container.finishTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44beab0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 872f2a6..68a780e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
/**
* Node managers information on available resources
@@ -189,4 +190,9 @@ public interface RMNode {
* @return a map of each allocation tag and its count.
*/
Map<String, Long> getAllocationTagsWithCount();
+
+ /**
+ * @return the RM context associated with this RM node.
+ */
+ RMContext getRMContext();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44beab0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index b942afa..dfd93e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -1541,4 +1541,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
return context.getAllocationTagsManager()
.getAllocationTagsWithCount(getNodeID());
}
+
+ @Override
+ public RMContext getRMContext() {
+ return this.context;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44beab0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index d5bfc57..59771fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -74,6 +75,7 @@ public abstract class SchedulerNode {
private final RMNode rmNode;
private final String nodeName;
+ private final RMContext rmContext;
private volatile Set<String> labels = null;
@@ -83,6 +85,7 @@ public abstract class SchedulerNode {
public SchedulerNode(RMNode node, boolean usePortForNodeName,
Set<String> labels) {
this.rmNode = node;
+ this.rmContext = node.getRMContext();
this.unallocatedResource = Resources.clone(node.getTotalCapability());
this.totalResource = Resources.clone(node.getTotalCapability());
if (usePortForNodeName) {
@@ -242,6 +245,18 @@ public abstract class SchedulerNode {
launchedContainers.remove(containerId);
Container container = info.container.getContainer();
+
+ // We remove allocation tags when a container is actually
+ // released on NM. This is to avoid running into situation
+ // when AM releases a container and NM has some delay to
+ // actually release it, then the tag can still be visible
+ // at RM so that RM can respect it during scheduling new containers.
+ if (rmContext != null && rmContext.getAllocationTagsManager() != null) {
+ rmContext.getAllocationTagsManager()
+ .removeContainer(container.getNodeId(),
+ container.getId(), container.getAllocationTags());
+ }
+
updateResourceForReleasedContainer(container);
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44beab0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 84105d9..9041132 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -286,6 +286,11 @@ public class MockNodes {
}
@Override
+ public RMContext getRMContext() {
+ return null;
+ }
+
+ @Override
public Resource getPhysicalResource() {
return this.physicalResource;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44beab0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index 7a930cd..1115e8c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -60,10 +60,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@@ -401,6 +405,7 @@ public class TestRMContainerImpl {
Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null);
+ container.setAllocationTags(ImmutableSet.of("mapper"));
ConcurrentMap<ApplicationId, RMApp> rmApps =
spy(new ConcurrentHashMap<ApplicationId, RMApp>());
RMApp rmApp = mock(RMApp.class);
@@ -423,11 +428,14 @@ public class TestRMContainerImpl {
true);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
+ RMNode rmNode = new RMNodeImpl(nodeId, rmContext,
+ "localhost", 0, 0, null, Resource.newInstance(10240, 10), null);
+ SchedulerNode schedulerNode = new FiCaSchedulerNode(rmNode, false);
+
/* First container: ALLOCATED -> KILLED */
RMContainerImpl rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), appAttemptId,
nodeId, "user", rmContext);
- rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
Assert.assertEquals(0,
tagsManager.getNodeCardinalityByOp(nodeId,
@@ -437,6 +445,7 @@ public class TestRMContainerImpl {
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
+ schedulerNode.allocateContainer(rmContainer);
Assert.assertEquals(1,
tagsManager.getNodeCardinalityByOp(nodeId,
@@ -446,6 +455,7 @@ public class TestRMContainerImpl {
rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
RMContainerEventType.KILL));
+ schedulerNode.releaseContainer(container.getId(), true);
Assert.assertEquals(0,
tagsManager.getNodeCardinalityByOp(nodeId,
@@ -465,6 +475,7 @@ public class TestRMContainerImpl {
rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
+ schedulerNode.allocateContainer(rmContainer);
Assert.assertEquals(1,
tagsManager.getNodeCardinalityByOp(nodeId,
@@ -477,6 +488,7 @@ public class TestRMContainerImpl {
rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
RMContainerEventType.FINISHED));
+ schedulerNode.releaseContainer(container.getId(), true);
Assert.assertEquals(0,
tagsManager.getNodeCardinalityByOp(nodeId,
@@ -496,6 +508,7 @@ public class TestRMContainerImpl {
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
+ schedulerNode.allocateContainer(rmContainer);
Assert.assertEquals(1,
tagsManager.getNodeCardinalityByOp(nodeId,
@@ -511,6 +524,7 @@ public class TestRMContainerImpl {
rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
RMContainerEventType.FINISHED));
+ schedulerNode.releaseContainer(container.getId(), true);
Assert.assertEquals(0,
tagsManager.getNodeCardinalityByOp(nodeId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/44beab0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index c0f8d39..ba409b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -27,9 +27,16 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -416,6 +423,103 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
}
}
+ @Test(timeout = 30000l)
+ public void testContainerReleaseWithAllocationTags() throws Exception {
+ // Currently only can be tested against capacity scheduler.
+ if (getSchedulerType().equals(SchedulerType.CAPACITY)) {
+ final String testTag1 = "some-tag";
+ final String testTag2 = "some-other-tag";
+ YarnConfiguration conf = getConf();
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, "scheduler");
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ MockNM nm1 = new MockNM("127.0.0.1:1234",
+ 10240, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ RMApp app1 =
+ rm1.submitApp(200, "name", "user", new HashMap<>(), false, "default",
+ -1, null, "Test", false, true);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // allocate 1 container with tag1
+ SchedulingRequest sr = SchedulingRequest
+ .newInstance(1l, Priority.newInstance(1),
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
+ Sets.newHashSet(testTag1),
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
+ null);
+
+ // allocate 3 containers with tag2
+ SchedulingRequest sr1 = SchedulingRequest
+ .newInstance(2l, Priority.newInstance(1),
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
+ Sets.newHashSet(testTag2),
+ ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
+ null);
+
+ AllocateRequest ar = AllocateRequest.newBuilder()
+ .schedulingRequests(Lists.newArrayList(sr, sr1)).build();
+ am1.allocate(ar);
+ nm1.nodeHeartbeat(true);
+
+ List<Container> allocated = new ArrayList<>();
+ while (allocated.size() < 4) {
+ AllocateResponse rsp = am1
+ .allocate(new ArrayList<>(), new ArrayList<>());
+ allocated.addAll(rsp.getAllocatedContainers());
+ nm1.nodeHeartbeat(true);
+ Thread.sleep(1000);
+ }
+
+ Assert.assertEquals(4, allocated.size());
+
+ Set<Container> containers = allocated.stream()
+ .filter(container -> container.getAllocationRequestId() == 1l)
+ .collect(Collectors.toSet());
+ Assert.assertNotNull(containers);
+ Assert.assertEquals(1, containers.size());
+ ContainerId cid = containers.iterator().next().getId();
+
+ // mock container start
+ rm1.getRMContext().getScheduler()
+ .getSchedulerNode(nm1.getNodeId()).containerStarted(cid);
+
+ // verifies the allocation is made with correct number of tags
+ Map<String, Long> nodeTags = rm1.getRMContext()
+ .getAllocationTagsManager()
+ .getAllocationTagsWithCount(nm1.getNodeId());
+ Assert.assertNotNull(nodeTags.get(testTag1));
+ Assert.assertEquals(1, nodeTags.get(testTag1).intValue());
+
+ // release a container
+ am1.allocate(new ArrayList<>(), Lists.newArrayList(cid));
+
+ // before NM confirms, the tag should still exist
+ nodeTags = rm1.getRMContext().getAllocationTagsManager()
+ .getAllocationTagsWithCount(nm1.getNodeId());
+ Assert.assertNotNull(nodeTags);
+ Assert.assertNotNull(nodeTags.get(testTag1));
+ Assert.assertEquals(1, nodeTags.get(testTag1).intValue());
+
+ // NM reports back that container is released
+ // RM should cleanup the tag
+ ContainerStatus cs = ContainerStatus.newInstance(cid,
+ ContainerState.COMPLETE, "", 0);
+ nm1.nodeHeartbeat(Lists.newArrayList(cs), true);
+
+ // Wait on condition
+ // 1) tag1 doesn't exist anymore
+ // 2) num of tag2 is still 3
+ GenericTestUtils.waitFor(() -> {
+ Map<String, Long> tags = rm1.getRMContext()
+ .getAllocationTagsManager()
+ .getAllocationTagsWithCount(nm1.getNodeId());
+ return tags.get(testTag1) == null &&
+ tags.get(testTag2).intValue() == 3;
+ }, 500, 3000);
+ }
+ }
+
@Test(timeout=60000)
public void testContainerReleasedByNode() throws Exception {
System.out.println("Starting testContainerReleasedByNode");
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org