You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/05/04 19:27:48 UTC
[20/50] [abbrv] hadoop git commit: YARN-2674. Fix distributed shell
AM container relaunch during RM work preserving restart. Contributed by Shane
Kumpf
YARN-2674. Fix distributed shell AM container relaunch during RM work preserving restart. Contributed by Shane Kumpf
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4e1382ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4e1382ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4e1382ac
Branch: refs/heads/HDFS-12943
Commit: 4e1382aca4cf23ca229bdd24e0f143c22449b329
Parents: d6139c5
Author: Billie Rinaldi <bi...@apache.org>
Authored: Mon Apr 30 14:34:51 2018 -0700
Committer: Billie Rinaldi <bi...@apache.org>
Committed: Tue May 1 07:27:47 2018 -0700
----------------------------------------------------------------------
.../distributedshell/ApplicationMaster.java | 68 +++++++++++++-------
.../distributedshell/TestDSAppMaster.java | 8 +--
2 files changed, 46 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e1382ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 75f4073..cca5676 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -31,6 +31,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -105,6 +106,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
@@ -1060,32 +1062,48 @@ public class ApplicationMaster {
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
- numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
- String yarnShellId = Integer.toString(yarnShellIdCounter);
- yarnShellIdCounter++;
- LOG.info("Launching shell command on a new container."
- + ", containerId=" + allocatedContainer.getId()
- + ", yarnShellId=" + yarnShellId
- + ", containerNode=" + allocatedContainer.getNodeId().getHost()
- + ":" + allocatedContainer.getNodeId().getPort()
- + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
- + ", containerResourceMemory"
- + allocatedContainer.getResource().getMemorySize()
- + ", containerResourceVirtualCores"
- + allocatedContainer.getResource().getVirtualCores());
- // + ", containerToken"
- // +allocatedContainer.getContainerToken().getIdentifier().toString());
-
- Thread launchThread = createLaunchContainerThread(allocatedContainer,
- yarnShellId);
-
- // launch and start the container on a separate thread to keep
- // the main thread unblocked
- // as all containers may not be allocated at one go.
- launchThreads.add(launchThread);
- launchedContainers.add(allocatedContainer.getId());
- launchThread.start();
+ if (numAllocatedContainers.get() == numTotalContainers) {
+ LOG.info("The requested number of containers have been allocated."
+ + " Releasing the extra container allocation from the RM.");
+ amRMClient.releaseAssignedContainer(allocatedContainer.getId());
+ } else {
+ numAllocatedContainers.addAndGet(1);
+ String yarnShellId = Integer.toString(yarnShellIdCounter);
+ yarnShellIdCounter++;
+ LOG.info(
+ "Launching shell command on a new container."
+ + ", containerId=" + allocatedContainer.getId()
+ + ", yarnShellId=" + yarnShellId
+ + ", containerNode="
+ + allocatedContainer.getNodeId().getHost()
+ + ":" + allocatedContainer.getNodeId().getPort()
+ + ", containerNodeURI="
+ + allocatedContainer.getNodeHttpAddress()
+ + ", containerResourceMemory"
+ + allocatedContainer.getResource().getMemorySize()
+ + ", containerResourceVirtualCores"
+ + allocatedContainer.getResource().getVirtualCores());
+
+ Thread launchThread =
+ createLaunchContainerThread(allocatedContainer, yarnShellId);
+
+ // launch and start the container on a separate thread to keep
+ // the main thread unblocked
+ // as all containers may not be allocated at one go.
+ launchThreads.add(launchThread);
+ launchedContainers.add(allocatedContainer.getId());
+ launchThread.start();
+
+ // Remove the corresponding request
+ Collection<AMRMClient.ContainerRequest> requests =
+ amRMClient.getMatchingRequests(
+ allocatedContainer.getAllocationRequestId());
+ if (requests.iterator().hasNext()) {
+ AMRMClient.ContainerRequest request = requests.iterator().next();
+ amRMClient.removeContainerRequest(request);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e1382ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
index f11bdf8..f2a8041 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
@@ -106,7 +106,6 @@ public class TestDSAppMaster {
handler.onContainersAllocated(containers);
Assert.assertEquals("Wrong container allocation count", 1,
master.getAllocatedContainers());
- Mockito.verifyZeroInteractions(mockClient);
Assert.assertEquals("Incorrect number of threads launched", 1,
master.threadsLaunched);
Assert.assertEquals("Incorrect YARN Shell IDs",
@@ -121,15 +120,14 @@ public class TestDSAppMaster {
ContainerId id4 = BuilderUtils.newContainerId(1, 1, 1, 4);
containers.add(generateContainer(id4));
handler.onContainersAllocated(containers);
- Assert.assertEquals("Wrong final container allocation count", 4,
+ Assert.assertEquals("Wrong final container allocation count", 2,
master.getAllocatedContainers());
- Assert.assertEquals("Incorrect number of threads launched", 4,
+ Assert.assertEquals("Incorrect number of threads launched", 2,
master.threadsLaunched);
Assert.assertEquals("Incorrect YARN Shell IDs",
- Arrays.asList("1", "2", "3", "4"), master.yarnShellIds);
-
+ Arrays.asList("1", "2"), master.yarnShellIds);
// make sure we handle completion events correctly
List<ContainerStatus> status = new ArrayList<>();
status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org