You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/02/27 15:15:12 UTC
[16/31] hadoop git commit: YARN-4779. Fix AM container allocation
logic in SLS. Contributed by Wangda Tan.
YARN-4779. Fix AM container allocation logic in SLS. Contributed by Wangda Tan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b32ffa27
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b32ffa27
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b32ffa27
Branch: refs/heads/HADOOP-13345
Commit: b32ffa2753e83615b980721b6067fcc35ce54372
Parents: e8694de
Author: Sunil G <su...@apache.org>
Authored: Fri Feb 24 21:39:25 2017 +0530
Committer: Sunil G <su...@apache.org>
Committed: Fri Feb 24 21:39:25 2017 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/yarn/sls/SLSRunner.java | 20 +-
.../hadoop/yarn/sls/appmaster/AMSimulator.java | 89 +++++---
.../yarn/sls/appmaster/MRAMSimulator.java | 218 ++++++++-----------
.../sls/resourcemanager/MockAMLauncher.java | 115 ++++++++++
.../sls/scheduler/SLSCapacityScheduler.java | 24 ++
5 files changed, 305 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 61738fb..61b7f36 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -55,12 +56,14 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
+import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper;
import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
@@ -119,10 +122,10 @@ public class SLSRunner {
this.printSimulation = printsimulation;
metricsOutputDir = outputDir;
- nmMap = new HashMap<NodeId, NMSimulator>();
- queueAppNumMap = new HashMap<String, Integer>();
- amMap = new HashMap<String, AMSimulator>();
- amClassMap = new HashMap<String, Class>();
+ nmMap = new HashMap<>();
+ queueAppNumMap = new HashMap<>();
+ amMap = new ConcurrentHashMap<>();
+ amClassMap = new HashMap<>();
// runner configuration
conf = new Configuration(false);
@@ -179,7 +182,14 @@ public class SLSRunner {
}
rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);
- rm = new ResourceManager();
+
+ final SLSRunner se = this;
+ rm = new ResourceManager() {
+ @Override
+ protected ApplicationMasterLauncher createAMLauncher() {
+ return new MockAMLauncher(se, this.rmContext, amMap);
+ }
+ };
rm.init(rmConf);
rm.start();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index d61bf02..5b03d51 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.log4j.Logger;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
@@ -107,11 +109,19 @@ public abstract class AMSimulator extends TaskRunner.Task {
// progress
protected int totalContainers;
protected int finishedContainers;
+
+ // waiting for AM container
+ volatile boolean isAMContainerRunning = false;
+ volatile Container amContainer;
protected final Logger LOG = Logger.getLogger(AMSimulator.class);
-
+
+ // resource for AM container
+ private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
+ private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1;
+
public AMSimulator() {
- this.responseQueue = new LinkedBlockingQueue<AllocateResponse>();
+ this.responseQueue = new LinkedBlockingQueue<>();
}
public void init(int id, int heartbeatInterval,
@@ -142,23 +152,30 @@ public abstract class AMSimulator extends TaskRunner.Task {
// submit application, waiting until ACCEPTED
submitApp();
- // register application master
- registerAM();
-
// track app metrics
trackApp();
}
+ public synchronized void notifyAMContainerLaunched(Container masterContainer)
+ throws Exception {
+ this.amContainer = masterContainer;
+ this.appAttemptId = masterContainer.getId().getApplicationAttemptId();
+ registerAM();
+ isAMContainerRunning = true;
+ }
+
@Override
public void middleStep() throws Exception {
- // process responses in the queue
- processResponseQueue();
-
- // send out request
- sendContainerRequest();
-
- // check whether finish
- checkStop();
+ if (isAMContainerRunning) {
+ // process responses in the queue
+ processResponseQueue();
+
+ // send out request
+ sendContainerRequest();
+
+ // check whether finish
+ checkStop();
+ }
}
@Override
@@ -168,6 +185,22 @@ public abstract class AMSimulator extends TaskRunner.Task {
if (isTracked) {
untrackApp();
}
+
+ // Finish AM container
+ if (amContainer != null) {
+ LOG.info("AM container = " + amContainer.getId() + " reported to finish");
+ se.getNmMap().get(amContainer.getNodeId()).cleanupContainer(
+ amContainer.getId());
+ } else {
+ LOG.info("AM container is null");
+ }
+
+ if (null == appAttemptId) {
+ // If appAttemptId == null, AM is not launched from RM's perspective, so
+ // it's unnecessary to finish am as well
+ return;
+ }
+
// unregister application master
final FinishApplicationMasterRequest finishAMRequest = recordFactory
.newRecordInstance(FinishApplicationMasterRequest.class);
@@ -256,7 +289,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
conLauContext.setLocalResources(new HashMap<String, LocalResource>());
conLauContext.setServiceData(new HashMap<String, ByteBuffer>());
appSubContext.setAMContainerSpec(conLauContext);
- appSubContext.setUnmanagedAM(true);
+ appSubContext.setResource(Resources
+ .createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB,
+ MR_AM_CONTAINER_RESOURCE_VCORES));
subAppRequest.setApplicationSubmissionContext(appSubContext);
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -267,22 +302,6 @@ public abstract class AMSimulator extends TaskRunner.Task {
}
});
LOG.info(MessageFormat.format("Submit a new application {0}", appId));
-
- // waiting until application ACCEPTED
- RMApp app = rm.getRMContext().getRMApps().get(appId);
- while(app.getState() != RMAppState.ACCEPTED) {
- Thread.sleep(10);
- }
-
- // Waiting until application attempt reach LAUNCHED
- // "Unmanaged AM must register after AM attempt reaches LAUNCHED state"
- this.appAttemptId = rm.getRMContext().getRMApps().get(appId)
- .getCurrentAppAttempt().getAppAttemptId();
- RMAppAttempt rmAppAttempt = rm.getRMContext().getRMApps().get(appId)
- .getCurrentAppAttempt();
- while (rmAppAttempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED) {
- Thread.sleep(10);
- }
}
private void registerAM()
@@ -335,7 +354,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
for (ContainerSimulator cs : csList) {
String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname());
// check rack local
- String rackname = rackHostNames[0];
+ String rackname = "/" + rackHostNames[0];
if (rackLocalRequestMap.containsKey(rackname)) {
rackLocalRequestMap.get(rackname).setNumContainers(
rackLocalRequestMap.get(rackname).getNumContainers() + 1);
@@ -383,4 +402,12 @@ public abstract class AMSimulator extends TaskRunner.Task {
public int getNumTasks() {
return totalContainers;
}
+
+ public ApplicationId getApplicationId() {
+ return appId;
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return appAttemptId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index da267a1..e726b09 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.apache.avro.Protocol;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
@@ -63,10 +64,10 @@ public class MRAMSimulator extends AMSimulator {
private static final int PRIORITY_REDUCE = 10;
private static final int PRIORITY_MAP = 20;
-
+
// pending maps
private LinkedList<ContainerSimulator> pendingMaps =
- new LinkedList<ContainerSimulator>();
+ new LinkedList<>();
// pending failed maps
private LinkedList<ContainerSimulator> pendingFailedMaps =
@@ -107,14 +108,9 @@ public class MRAMSimulator extends AMSimulator {
private int mapTotal = 0;
private int reduceFinished = 0;
private int reduceTotal = 0;
- // waiting for AM container
- private boolean isAMContainerRunning = false;
- private Container amContainer;
+
// finished
private boolean isFinished = false;
- // resource for AM container
- private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
- private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1;
public final Logger LOG = Logger.getLogger(MRAMSimulator.class);
@@ -131,83 +127,34 @@ public class MRAMSimulator extends AMSimulator {
for (ContainerSimulator cs : containerList) {
if (cs.getType().equals("map")) {
cs.setPriority(PRIORITY_MAP);
- pendingMaps.add(cs);
+ allMaps.add(cs);
} else if (cs.getType().equals("reduce")) {
cs.setPriority(PRIORITY_REDUCE);
- pendingReduces.add(cs);
+ allReduces.add(cs);
}
}
- allMaps.addAll(pendingMaps);
- allReduces.addAll(pendingReduces);
- mapTotal = pendingMaps.size();
- reduceTotal = pendingReduces.size();
+
+ LOG.info(MessageFormat
+ .format("Added new job with {0} mapper and {1} reducers",
+ allMaps.size(), allReduces.size()));
+
+ mapTotal = allMaps.size();
+ reduceTotal = allReduces.size();
totalContainers = mapTotal + reduceTotal;
}
@Override
- public void firstStep() throws Exception {
- super.firstStep();
-
- requestAMContainer();
- }
-
- /**
- * send out request for AM container
- */
- protected void requestAMContainer()
- throws YarnException, IOException, InterruptedException {
- List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
- ResourceRequest amRequest = createResourceRequest(
- BuilderUtils.newResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB,
- MR_AM_CONTAINER_RESOURCE_VCORES),
- ResourceRequest.ANY, 1, 1);
- ask.add(amRequest);
- LOG.debug(MessageFormat.format("Application {0} sends out allocate " +
- "request for its AM", appId));
- final AllocateRequest request = this.createAllocateRequest(ask);
-
- UserGroupInformation ugi =
- UserGroupInformation.createRemoteUser(appAttemptId.toString());
- Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
- .get(appAttemptId.getApplicationId())
- .getRMAppAttempt(appAttemptId).getAMRMToken();
- ugi.addTokenIdentifier(token.decodeIdentifier());
- AllocateResponse response = ugi.doAs(
- new PrivilegedExceptionAction<AllocateResponse>() {
- @Override
- public AllocateResponse run() throws Exception {
- return rm.getApplicationMasterService().allocate(request);
- }
- });
- if (response != null) {
- responseQueue.put(response);
+ public synchronized void notifyAMContainerLaunched(Container masterContainer)
+ throws Exception {
+ if (null != masterContainer) {
+ restart();
+ super.notifyAMContainerLaunched(masterContainer);
}
}
@Override
@SuppressWarnings("unchecked")
- protected void processResponseQueue()
- throws InterruptedException, YarnException, IOException {
- // Check whether receive the am container
- if (!isAMContainerRunning) {
- if (!responseQueue.isEmpty()) {
- AllocateResponse response = responseQueue.take();
- if (response != null
- && !response.getAllocatedContainers().isEmpty()) {
- // Get AM container
- Container container = response.getAllocatedContainers().get(0);
- se.getNmMap().get(container.getNodeId())
- .addNewContainer(container, -1L);
- // Start AM container
- amContainer = container;
- LOG.debug(MessageFormat.format("Application {0} starts its " +
- "AM container ({1}).", appId, amContainer.getId()));
- isAMContainerRunning = true;
- }
- }
- return;
- }
-
+ protected void processResponseQueue() throws Exception {
while (! responseQueue.isEmpty()) {
AllocateResponse response = responseQueue.take();
@@ -228,12 +175,16 @@ public class MRAMSimulator extends AMSimulator {
assignedReduces.remove(containerId);
reduceFinished ++;
finishedContainers ++;
- } else {
+ } else if (amContainer.getId().equals(containerId)){
// am container released event
isFinished = true;
LOG.info(MessageFormat.format("Application {0} goes to " +
"finish.", appId));
}
+
+ if (mapFinished >= mapTotal && reduceFinished >= reduceTotal) {
+ lastStep();
+ }
} else {
// container to be killed
if (assignedMaps.containsKey(containerId)) {
@@ -244,10 +195,9 @@ public class MRAMSimulator extends AMSimulator {
LOG.debug(MessageFormat.format("Application {0} has one " +
"reducer killed ({1}).", appId, containerId));
pendingFailedReduces.add(assignedReduces.remove(containerId));
- } else {
+ } else if (amContainer.getId().equals(containerId)){
LOG.info(MessageFormat.format("Application {0}'s AM is " +
- "going to be killed. Restarting...", appId));
- restart();
+ "going to be killed. Waiting for rescheduling...", appId));
}
}
}
@@ -255,11 +205,8 @@ public class MRAMSimulator extends AMSimulator {
// check finished
if (isAMContainerRunning &&
- (mapFinished == mapTotal) &&
- (reduceFinished == reduceTotal)) {
- // to release the AM container
- se.getNmMap().get(amContainer.getNodeId())
- .cleanupContainer(amContainer.getId());
+ (mapFinished >= mapTotal) &&
+ (reduceFinished >= reduceTotal)) {
isAMContainerRunning = false;
LOG.debug(MessageFormat.format("Application {0} sends out event " +
"to clean up its AM container.", appId));
@@ -293,21 +240,38 @@ public class MRAMSimulator extends AMSimulator {
*/
private void restart()
throws YarnException, IOException, InterruptedException {
- // clear
- finishedContainers = 0;
+ // clear
isFinished = false;
- mapFinished = 0;
- reduceFinished = 0;
pendingFailedMaps.clear();
pendingMaps.clear();
pendingReduces.clear();
pendingFailedReduces.clear();
- pendingMaps.addAll(allMaps);
- pendingReduces.addAll(pendingReduces);
- isAMContainerRunning = false;
+
+ // Only add totalMaps - finishedMaps
+ int added = 0;
+ for (ContainerSimulator cs : allMaps) {
+ if (added >= mapTotal - mapFinished) {
+ break;
+ }
+ pendingMaps.add(cs);
+ }
+
+ // And same, only add totalReduces - finishedReduces
+ added = 0;
+ for (ContainerSimulator cs : allReduces) {
+ if (added >= reduceTotal - reduceFinished) {
+ break;
+ }
+ pendingReduces.add(cs);
+ }
amContainer = null;
- // resent am container request
- requestAMContainer();
+ }
+
+ private List<ContainerSimulator> mergeLists(List<ContainerSimulator> left, List<ContainerSimulator> right) {
+ List<ContainerSimulator> list = new ArrayList<>();
+ list.addAll(left);
+ list.addAll(right);
+ return list;
}
@Override
@@ -319,44 +283,48 @@ public class MRAMSimulator extends AMSimulator {
// send out request
List<ResourceRequest> ask = null;
- if (isAMContainerRunning) {
- if (mapFinished != mapTotal) {
- // map phase
- if (! pendingMaps.isEmpty()) {
- ask = packageRequests(pendingMaps, PRIORITY_MAP);
- LOG.debug(MessageFormat.format("Application {0} sends out " +
- "request for {1} mappers.", appId, pendingMaps.size()));
- scheduledMaps.addAll(pendingMaps);
- pendingMaps.clear();
- } else if (! pendingFailedMaps.isEmpty() && scheduledMaps.isEmpty()) {
- ask = packageRequests(pendingFailedMaps, PRIORITY_MAP);
- LOG.debug(MessageFormat.format("Application {0} sends out " +
- "requests for {1} failed mappers.", appId,
- pendingFailedMaps.size()));
- scheduledMaps.addAll(pendingFailedMaps);
- pendingFailedMaps.clear();
- }
- } else if (reduceFinished != reduceTotal) {
- // reduce phase
- if (! pendingReduces.isEmpty()) {
- ask = packageRequests(pendingReduces, PRIORITY_REDUCE);
- LOG.debug(MessageFormat.format("Application {0} sends out " +
- "requests for {1} reducers.", appId, pendingReduces.size()));
- scheduledReduces.addAll(pendingReduces);
- pendingReduces.clear();
- } else if (! pendingFailedReduces.isEmpty()
- && scheduledReduces.isEmpty()) {
- ask = packageRequests(pendingFailedReduces, PRIORITY_REDUCE);
- LOG.debug(MessageFormat.format("Application {0} sends out " +
- "request for {1} failed reducers.", appId,
- pendingFailedReduces.size()));
- scheduledReduces.addAll(pendingFailedReduces);
- pendingFailedReduces.clear();
- }
+ if (mapFinished != mapTotal) {
+ // map phase
+ if (!pendingMaps.isEmpty()) {
+ ask = packageRequests(mergeLists(pendingMaps, scheduledMaps),
+ PRIORITY_MAP);
+ LOG.debug(MessageFormat
+ .format("Application {0} sends out " + "request for {1} mappers.",
+ appId, pendingMaps.size()));
+ scheduledMaps.addAll(pendingMaps);
+ pendingMaps.clear();
+ } else if (!pendingFailedMaps.isEmpty()) {
+ ask = packageRequests(mergeLists(pendingFailedMaps, scheduledMaps),
+ PRIORITY_MAP);
+ LOG.debug(MessageFormat.format(
+ "Application {0} sends out " + "requests for {1} failed mappers.",
+ appId, pendingFailedMaps.size()));
+ scheduledMaps.addAll(pendingFailedMaps);
+ pendingFailedMaps.clear();
+ }
+ } else if (reduceFinished != reduceTotal) {
+ // reduce phase
+ if (!pendingReduces.isEmpty()) {
+ ask = packageRequests(mergeLists(pendingReduces, scheduledReduces),
+ PRIORITY_REDUCE);
+ LOG.debug(MessageFormat
+ .format("Application {0} sends out " + "requests for {1} reducers.",
+ appId, pendingReduces.size()));
+ scheduledReduces.addAll(pendingReduces);
+ pendingReduces.clear();
+ } else if (!pendingFailedReduces.isEmpty()) {
+ ask = packageRequests(mergeLists(pendingFailedReduces, scheduledReduces),
+ PRIORITY_REDUCE);
+ LOG.debug(MessageFormat.format(
+ "Application {0} sends out " + "request for {1} failed reducers.",
+ appId, pendingFailedReduces.size()));
+ scheduledReduces.addAll(pendingFailedReduces);
+ pendingFailedReduces.clear();
}
}
+
if (ask == null) {
- ask = new ArrayList<ResourceRequest>();
+ ask = new ArrayList<>();
}
final AllocateRequest request = createAllocateRequest(ask);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
new file mode 100644
index 0000000..20cf3e5
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.resourcemanager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
+
+import java.util.Map;
+
+public class MockAMLauncher extends ApplicationMasterLauncher
+ implements EventHandler<AMLauncherEvent> {
+ private static final Log LOG = LogFactory.getLog(
+ MockAMLauncher.class);
+
+ Map<String, AMSimulator> amMap;
+ SLSRunner se;
+
+ public MockAMLauncher(SLSRunner se, RMContext rmContext,
+ Map<String, AMSimulator> amMap) {
+ super(rmContext);
+ this.amMap = amMap;
+ this.se = se;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ // Do nothing
+ }
+
+ private void setupAMRMToken(RMAppAttempt appAttempt) {
+ // Setup AMRMToken
+ Token<AMRMTokenIdentifier> amrmToken =
+ super.context.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ appAttempt.getAppAttemptId());
+ ((RMAppAttemptImpl) appAttempt).setAMRMToken(amrmToken);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void handle(AMLauncherEvent event) {
+ if (AMLauncherEventType.LAUNCH == event.getType()) {
+ ApplicationId appId =
+ event.getAppAttempt().getAppAttemptId().getApplicationId();
+
+ // find AMSimulator
+ for (AMSimulator ams : amMap.values()) {
+ if (ams.getApplicationId() != null && ams.getApplicationId().equals(
+ appId)) {
+ try {
+ Container amContainer = event.getAppAttempt().getMasterContainer();
+
+ setupAMRMToken(event.getAppAttempt());
+
+ // Notify RMAppAttempt to change state
+ super.context.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(),
+ RMAppAttemptEventType.LAUNCHED));
+
+ ams.notifyAMContainerLaunched(
+ event.getAppAttempt().getMasterContainer());
+ LOG.info("Notify AM launcher launched:" + amContainer.getId());
+
+ se.getNmMap().get(amContainer.getNodeId())
+ .addNewContainer(amContainer, 100000000L);
+
+ return;
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+ }
+
+ throw new YarnRuntimeException(
+ "Didn't find any AMSimulator for applicationId=" + appId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b32ffa27/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
index 8388273..cd4377e 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
@@ -556,6 +556,30 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
}
}
);
+ metrics.register("variable.cluster.reserved.memory",
+ new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ if(getRootQueueMetrics() == null) {
+ return 0L;
+ } else {
+ return getRootQueueMetrics().getReservedMB();
+ }
+ }
+ }
+ );
+ metrics.register("variable.cluster.reserved.vcores",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ if(getRootQueueMetrics() == null) {
+ return 0;
+ } else {
+ return getRootQueueMetrics().getReservedVirtualCores();
+ }
+ }
+ }
+ );
}
private void registerContainerAppNumMetrics() {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org