You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tu...@apache.org on 2013/09/27 22:23:21 UTC
svn commit: r1527059 [7/9] - in /hadoop/common/trunk:
hadoop-assemblies/src/main/resources/assemblies/ hadoop-project/
hadoop-project/src/site/ hadoop-tools/ hadoop-tools/hadoop-openstack/
hadoop-tools/hadoop-sls/ hadoop-tools/hadoop-sls/dev-support/ h...
Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.appmaster;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.log4j.Logger;
+
+public class MRAMSimulator extends AMSimulator {
+ /*
+ Vocabulary Used:
+ pending -> requests which are NOT yet sent to RM
+ scheduled -> requests which are sent to RM but not yet assigned
+ assigned -> requests which are assigned to a container
+ completed -> request corresponding to which container has completed
+
+ Maps are scheduled as soon as their requests are received. Reduces are
+ scheduled when all maps have finished (not support slow-start currently).
+ */
+
+ private static final int PRIORITY_REDUCE = 10;
+ private static final int PRIORITY_MAP = 20;
+
+ // pending maps
+ private LinkedList<ContainerSimulator> pendingMaps =
+ new LinkedList<ContainerSimulator>();
+
+ // pending failed maps
+ private LinkedList<ContainerSimulator> pendingFailedMaps =
+ new LinkedList<ContainerSimulator>();
+
+ // scheduled maps
+ private LinkedList<ContainerSimulator> scheduledMaps =
+ new LinkedList<ContainerSimulator>();
+
+ // assigned maps
+ private Map<ContainerId, ContainerSimulator> assignedMaps =
+ new HashMap<ContainerId, ContainerSimulator>();
+
+ // reduces which are not yet scheduled
+ private LinkedList<ContainerSimulator> pendingReduces =
+ new LinkedList<ContainerSimulator>();
+
+ // pending failed reduces
+ private LinkedList<ContainerSimulator> pendingFailedReduces =
+ new LinkedList<ContainerSimulator>();
+
+ // scheduled reduces
+ private LinkedList<ContainerSimulator> scheduledReduces =
+ new LinkedList<ContainerSimulator>();
+
+ // assigned reduces
+ private Map<ContainerId, ContainerSimulator> assignedReduces =
+ new HashMap<ContainerId, ContainerSimulator>();
+
+ // all maps & reduces
+ private LinkedList<ContainerSimulator> allMaps =
+ new LinkedList<ContainerSimulator>();
+ private LinkedList<ContainerSimulator> allReduces =
+ new LinkedList<ContainerSimulator>();
+
+ // counters
+ private int mapFinished = 0;
+ private int mapTotal = 0;
+ private int reduceFinished = 0;
+ private int reduceTotal = 0;
+ // waiting for AM container
+ private boolean isAMContainerRunning = false;
+ private Container amContainer;
+ // finished
+ private boolean isFinished = false;
+ // resource for AM container
+ private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
+ private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1;
+
+ public final Logger LOG = Logger.getLogger(MRAMSimulator.class);
+
+ public void init(int id, int heartbeatInterval,
+ List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
+ long traceStartTime, long traceFinishTime, String user, String queue,
+ boolean isTracked, String oldAppId) {
+ super.init(id, heartbeatInterval, containerList, rm, se,
+ traceStartTime, traceFinishTime, user, queue,
+ isTracked, oldAppId);
+ amtype = "mapreduce";
+
+ // get map/reduce tasks
+ for (ContainerSimulator cs : containerList) {
+ if (cs.getType().equals("map")) {
+ cs.setPriority(PRIORITY_MAP);
+ pendingMaps.add(cs);
+ } else if (cs.getType().equals("reduce")) {
+ cs.setPriority(PRIORITY_REDUCE);
+ pendingReduces.add(cs);
+ }
+ }
+ allMaps.addAll(pendingMaps);
+ allReduces.addAll(pendingReduces);
+ mapTotal = pendingMaps.size();
+ reduceTotal = pendingReduces.size();
+ totalContainers = mapTotal + reduceTotal;
+ }
+
+ @Override
+ public void firstStep()
+ throws YarnException, IOException, InterruptedException {
+ super.firstStep();
+
+ requestAMContainer();
+ }
+
+ /**
+ * send out request for AM container
+ */
+ protected void requestAMContainer()
+ throws YarnException, IOException, InterruptedException {
+ List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+ ResourceRequest amRequest = createResourceRequest(
+ BuilderUtils.newResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB,
+ MR_AM_CONTAINER_RESOURCE_VCORES),
+ ResourceRequest.ANY, 1, 1);
+ ask.add(amRequest);
+ LOG.debug(MessageFormat.format("Application {0} sends out allocate " +
+ "request for its AM", appId));
+ final AllocateRequest request = this.createAllocateRequest(ask);
+
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
+ .get(appAttemptId.getApplicationId())
+ .getRMAppAttempt(appAttemptId).getAMRMToken();
+ ugi.addTokenIdentifier(token.decodeIdentifier());
+ AllocateResponse response = ugi.doAs(
+ new PrivilegedExceptionAction<AllocateResponse>() {
+ @Override
+ public AllocateResponse run() throws Exception {
+ return rm.getApplicationMasterService().allocate(request);
+ }
+ });
+
+ // waiting until the AM container is allocated
+ while (true) {
+ if (response != null && ! response.getAllocatedContainers().isEmpty()) {
+ // get AM container
+ Container container = response.getAllocatedContainers().get(0);
+ se.getNmMap().get(container.getNodeId())
+ .addNewContainer(container, -1L);
+ // start AM container
+ amContainer = container;
+ LOG.debug(MessageFormat.format("Application {0} starts its " +
+ "AM container ({1}).", appId, amContainer.getId()));
+ isAMContainerRunning = true;
+ break;
+ }
+ // this sleep time is different from HeartBeat
+ Thread.sleep(1000);
+ // send out empty request
+ sendContainerRequest();
+ response = responseQueue.take();
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void processResponseQueue()
+ throws InterruptedException, YarnException, IOException {
+ while (! responseQueue.isEmpty()) {
+ AllocateResponse response = responseQueue.take();
+
+ // check completed containers
+ if (! response.getCompletedContainersStatuses().isEmpty()) {
+ for (ContainerStatus cs : response.getCompletedContainersStatuses()) {
+ ContainerId containerId = cs.getContainerId();
+ if (cs.getExitStatus() == ContainerExitStatus.SUCCESS) {
+ if (assignedMaps.containsKey(containerId)) {
+ LOG.debug(MessageFormat.format("Application {0} has one" +
+ "mapper finished ({1}).", appId, containerId));
+ assignedMaps.remove(containerId);
+ mapFinished ++;
+ finishedContainers ++;
+ } else if (assignedReduces.containsKey(containerId)) {
+ LOG.debug(MessageFormat.format("Application {0} has one" +
+ "reducer finished ({1}).", appId, containerId));
+ assignedReduces.remove(containerId);
+ reduceFinished ++;
+ finishedContainers ++;
+ } else {
+ // am container released event
+ isFinished = true;
+ LOG.info(MessageFormat.format("Application {0} goes to " +
+ "finish.", appId));
+ }
+ } else {
+ // container to be killed
+ if (assignedMaps.containsKey(containerId)) {
+ LOG.debug(MessageFormat.format("Application {0} has one " +
+ "mapper killed ({1}).", appId, containerId));
+ pendingFailedMaps.add(assignedMaps.remove(containerId));
+ } else if (assignedReduces.containsKey(containerId)) {
+ LOG.debug(MessageFormat.format("Application {0} has one " +
+ "reducer killed ({1}).", appId, containerId));
+ pendingFailedReduces.add(assignedReduces.remove(containerId));
+ } else {
+ LOG.info(MessageFormat.format("Application {0}'s AM is " +
+ "going to be killed. Restarting...", appId));
+ restart();
+ }
+ }
+ }
+ }
+
+ // check finished
+ if (isAMContainerRunning &&
+ (mapFinished == mapTotal) &&
+ (reduceFinished == reduceTotal)) {
+ // to release the AM container
+ se.getNmMap().get(amContainer.getNodeId())
+ .cleanupContainer(amContainer.getId());
+ isAMContainerRunning = false;
+ LOG.debug(MessageFormat.format("Application {0} sends out event " +
+ "to clean up its AM container.", appId));
+ isFinished = true;
+ }
+
+ // check allocated containers
+ for (Container container : response.getAllocatedContainers()) {
+ if (! scheduledMaps.isEmpty()) {
+ ContainerSimulator cs = scheduledMaps.remove();
+ LOG.debug(MessageFormat.format("Application {0} starts a " +
+ "launch a mapper ({1}).", appId, container.getId()));
+ assignedMaps.put(container.getId(), cs);
+ se.getNmMap().get(container.getNodeId())
+ .addNewContainer(container, cs.getLifeTime());
+ } else if (! this.scheduledReduces.isEmpty()) {
+ ContainerSimulator cs = scheduledReduces.remove();
+ LOG.debug(MessageFormat.format("Application {0} starts a " +
+ "launch a reducer ({1}).", appId, container.getId()));
+ assignedReduces.put(container.getId(), cs);
+ se.getNmMap().get(container.getNodeId())
+ .addNewContainer(container, cs.getLifeTime());
+ }
+ }
+ }
+ }
+
+ /**
+ * restart running because of the am container killed
+ */
+ private void restart()
+ throws YarnException, IOException, InterruptedException {
+ // clear
+ finishedContainers = 0;
+ isFinished = false;
+ mapFinished = 0;
+ reduceFinished = 0;
+ pendingFailedMaps.clear();
+ pendingMaps.clear();
+ pendingReduces.clear();
+ pendingFailedReduces.clear();
+ pendingMaps.addAll(allMaps);
+ pendingReduces.addAll(pendingReduces);
+ isAMContainerRunning = false;
+ amContainer = null;
+ // resent am container request
+ requestAMContainer();
+ }
+
+ @Override
+ protected void sendContainerRequest()
+ throws YarnException, IOException, InterruptedException {
+ if (isFinished) {
+ return;
+ }
+
+ // send out request
+ List<ResourceRequest> ask = null;
+ if (isAMContainerRunning) {
+ if (mapFinished != mapTotal) {
+ // map phase
+ if (! pendingMaps.isEmpty()) {
+ ask = packageRequests(pendingMaps, PRIORITY_MAP);
+ LOG.debug(MessageFormat.format("Application {0} sends out " +
+ "request for {1} mappers.", appId, pendingMaps.size()));
+ scheduledMaps.addAll(pendingMaps);
+ pendingMaps.clear();
+ } else if (! pendingFailedMaps.isEmpty() && scheduledMaps.isEmpty()) {
+ ask = packageRequests(pendingFailedMaps, PRIORITY_MAP);
+ LOG.debug(MessageFormat.format("Application {0} sends out " +
+ "requests for {1} failed mappers.", appId,
+ pendingFailedMaps.size()));
+ scheduledMaps.addAll(pendingFailedMaps);
+ pendingFailedMaps.clear();
+ }
+ } else if (reduceFinished != reduceTotal) {
+ // reduce phase
+ if (! pendingReduces.isEmpty()) {
+ ask = packageRequests(pendingReduces, PRIORITY_REDUCE);
+ LOG.debug(MessageFormat.format("Application {0} sends out " +
+ "requests for {1} reducers.", appId, pendingReduces.size()));
+ scheduledReduces.addAll(pendingReduces);
+ pendingReduces.clear();
+ } else if (! pendingFailedReduces.isEmpty()
+ && scheduledReduces.isEmpty()) {
+ ask = packageRequests(pendingFailedReduces, PRIORITY_REDUCE);
+ LOG.debug(MessageFormat.format("Application {0} sends out " +
+ "request for {1} failed reducers.", appId,
+ pendingFailedReduces.size()));
+ scheduledReduces.addAll(pendingFailedReduces);
+ pendingFailedReduces.clear();
+ }
+ }
+ }
+ if (ask == null) {
+ ask = new ArrayList<ResourceRequest>();
+ }
+
+ final AllocateRequest request = createAllocateRequest(ask);
+ if (totalContainers == 0) {
+ request.setProgress(1.0f);
+ } else {
+ request.setProgress((float) finishedContainers / totalContainers);
+ }
+
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
+ .get(appAttemptId.getApplicationId())
+ .getRMAppAttempt(appAttemptId).getAMRMToken();
+ ugi.addTokenIdentifier(token.decodeIdentifier());
+ AllocateResponse response = ugi.doAs(
+ new PrivilegedExceptionAction<AllocateResponse>() {
+ @Override
+ public AllocateResponse run() throws Exception {
+ return rm.getApplicationMasterService().allocate(request);
+ }
+ });
+ if (response != null) {
+ responseQueue.put(response);
+ }
+ }
+
+ @Override
+ protected void checkStop() {
+ if (isFinished) {
+ super.setEndTime(System.currentTimeMillis());
+ }
+ }
+
+ @Override
+ public void lastStep() {
+ super.lastStep();
+
+ // clear data structures
+ allMaps.clear();
+ allReduces.clear();
+ assignedMaps.clear();
+ assignedReduces.clear();
+ pendingFailedMaps.clear();
+ pendingFailedReduces.clear();
+ pendingMaps.clear();
+ pendingReduces.clear();
+ scheduledMaps.clear();
+ scheduledReduces.clear();
+ responseQueue.clear();
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.conf;
+
+public class SLSConfiguration {
+ // sls
+ public static final String PREFIX = "yarn.sls.";
+ // runner
+ public static final String RUNNER_PREFIX = PREFIX + "runner.";
+ public static final String RUNNER_POOL_SIZE = RUNNER_PREFIX + "pool.size";
+ public static final int RUNNER_POOL_SIZE_DEFAULT = 10;
+ // scheduler
+ public static final String SCHEDULER_PREFIX = PREFIX + "scheduler.";
+ public static final String RM_SCHEDULER = SCHEDULER_PREFIX + "class";
+ // metrics
+ public static final String METRICS_PREFIX = PREFIX + "metrics.";
+ public static final String METRICS_SWITCH = METRICS_PREFIX + "switch";
+ public static final String METRICS_WEB_ADDRESS_PORT = METRICS_PREFIX
+ + "web.address.port";
+ public static final String METRICS_OUTPUT_DIR = METRICS_PREFIX + "output";
+ public static final int METRICS_WEB_ADDRESS_PORT_DEFAULT = 10001;
+ public static final String METRICS_TIMER_WINDOW_SIZE = METRICS_PREFIX
+ + "timer.window.size";
+ public static final int METRICS_TIMER_WINDOW_SIZE_DEFAULT = 100;
+ public static final String METRICS_RECORD_INTERVAL_MS = METRICS_PREFIX
+ + "record.interval.ms";
+ public static final int METRICS_RECORD_INTERVAL_MS_DEFAULT = 1000;
+ // nm
+ public static final String NM_PREFIX = PREFIX + "nm.";
+ public static final String NM_MEMORY_MB = NM_PREFIX + "memory.mb";
+ public static final int NM_MEMORY_MB_DEFAULT = 10240;
+ public static final String NM_VCORES = NM_PREFIX + "vcores";
+ public static final int NM_VCORES_DEFAULT = 10;
+ public static final String NM_HEARTBEAT_INTERVAL_MS = NM_PREFIX
+ + "heartbeat.interval.ms";
+ public static final int NM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
+ // am
+ public static final String AM_PREFIX = PREFIX + "am.";
+ public static final String AM_HEARTBEAT_INTERVAL_MS = AM_PREFIX
+ + "heartbeat.interval.ms";
+ public static final int AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
+ public static final String AM_TYPE = AM_PREFIX + "type.";
+
+ // container
+ public static final String CONTAINER_PREFIX = PREFIX + "container.";
+ public static final String CONTAINER_MEMORY_MB = CONTAINER_PREFIX
+ + "memory.mb";
+ public static final int CONTAINER_MEMORY_MB_DEFAULT = 1024;
+ public static final String CONTAINER_VCORES = CONTAINER_PREFIX + "vcores";
+ public static final int CONTAINER_VCORES_DEFAULT = 1;
+
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.nodemanager;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+ .RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+ .RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Logger;
+
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
+import org.apache.hadoop.yarn.sls.utils.SLSUtils;
+
+public class NMSimulator extends TaskRunner.Task {
+ // node resource
+ private RMNode node;
+ // master key
+ private MasterKey masterKey;
+ // containers with various STATE
+ private List<ContainerId> completedContainerList;
+ private List<ContainerId> releasedContainerList;
+ private DelayQueue<ContainerSimulator> containerQueue;
+ private Map<ContainerId, ContainerSimulator> runningContainers;
+ private List<ContainerId> amContainerList;
+ // resource manager
+ private ResourceManager rm;
+ // heart beat response id
+ private int RESPONSE_ID = 1;
+ private final static Logger LOG = Logger.getLogger(NMSimulator.class);
+
+ public void init(String nodeIdStr, int memory, int cores,
+ int dispatchTime, int heartBeatInterval, ResourceManager rm)
+ throws IOException, YarnException {
+ super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval,
+ heartBeatInterval);
+ // create resource
+ String rackHostName[] = SLSUtils.getRackHostName(nodeIdStr);
+ this.node = NodeInfo.newNodeInfo(rackHostName[0], rackHostName[1],
+ BuilderUtils.newResource(memory, cores));
+ this.rm = rm;
+ // init data structures
+ completedContainerList =
+ Collections.synchronizedList(new ArrayList<ContainerId>());
+ releasedContainerList =
+ Collections.synchronizedList(new ArrayList<ContainerId>());
+ containerQueue = new DelayQueue<ContainerSimulator>();
+ amContainerList =
+ Collections.synchronizedList(new ArrayList<ContainerId>());
+ runningContainers =
+ new ConcurrentHashMap<ContainerId, ContainerSimulator>();
+ // register NM with RM
+ RegisterNodeManagerRequest req =
+ Records.newRecord(RegisterNodeManagerRequest.class);
+ req.setNodeId(node.getNodeID());
+ req.setResource(node.getTotalCapability());
+ req.setHttpPort(80);
+ RegisterNodeManagerResponse response = rm.getResourceTrackerService()
+ .registerNodeManager(req);
+ masterKey = response.getNMTokenMasterKey();
+ }
+
+ @Override
+ public void firstStep() throws YarnException, IOException {
+ // do nothing
+ }
+
+ @Override
+ public void middleStep() {
+ // we check the lifetime for each running containers
+ ContainerSimulator cs = null;
+ synchronized(completedContainerList) {
+ while ((cs = containerQueue.poll()) != null) {
+ runningContainers.remove(cs.getId());
+ completedContainerList.add(cs.getId());
+ LOG.debug(MessageFormat.format("Container {0} has completed",
+ cs.getId()));
+ }
+ }
+
+ // send heart beat
+ NodeHeartbeatRequest beatRequest =
+ Records.newRecord(NodeHeartbeatRequest.class);
+ beatRequest.setLastKnownNMTokenMasterKey(masterKey);
+ NodeStatus ns = Records.newRecord(NodeStatus.class);
+
+ ns.setContainersStatuses(generateContainerStatusList());
+ ns.setNodeId(node.getNodeID());
+ ns.setKeepAliveApplications(new ArrayList<ApplicationId>());
+ ns.setResponseId(RESPONSE_ID ++);
+ ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0));
+ beatRequest.setNodeStatus(ns);
+ try {
+ NodeHeartbeatResponse beatResponse =
+ rm.getResourceTrackerService().nodeHeartbeat(beatRequest);
+ if (! beatResponse.getContainersToCleanup().isEmpty()) {
+ // remove from queue
+ synchronized(releasedContainerList) {
+ for (ContainerId containerId : beatResponse.getContainersToCleanup()){
+ if (amContainerList.contains(containerId)) {
+ // AM container (not killed?, only release)
+ synchronized(amContainerList) {
+ amContainerList.remove(containerId);
+ }
+ LOG.debug(MessageFormat.format("NodeManager {0} releases " +
+ "an AM ({1}).", node.getNodeID(), containerId));
+ } else {
+ cs = runningContainers.remove(containerId);
+ containerQueue.remove(cs);
+ releasedContainerList.add(containerId);
+ LOG.debug(MessageFormat.format("NodeManager {0} releases a " +
+ "container ({1}).", node.getNodeID(), containerId));
+ }
+ }
+ }
+ }
+ if (beatResponse.getNodeAction() == NodeAction.SHUTDOWN) {
+ lastStep();
+ }
+ } catch (YarnException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void lastStep() {
+ // do nothing
+ }
+
+ /**
+ * catch status of all containers located on current node
+ */
+ private ArrayList<ContainerStatus> generateContainerStatusList() {
+ ArrayList<ContainerStatus> csList = new ArrayList<ContainerStatus>();
+ // add running containers
+ for (ContainerSimulator container : runningContainers.values()) {
+ csList.add(newContainerStatus(container.getId(),
+ ContainerState.RUNNING, ContainerExitStatus.SUCCESS));
+ }
+ synchronized(amContainerList) {
+ for (ContainerId cId : amContainerList) {
+ csList.add(newContainerStatus(cId,
+ ContainerState.RUNNING, ContainerExitStatus.SUCCESS));
+ }
+ }
+ // add complete containers
+ synchronized(completedContainerList) {
+ for (ContainerId cId : completedContainerList) {
+ LOG.debug(MessageFormat.format("NodeManager {0} completed" +
+ " container ({1}).", node.getNodeID(), cId));
+ csList.add(newContainerStatus(
+ cId, ContainerState.COMPLETE, ContainerExitStatus.SUCCESS));
+ }
+ completedContainerList.clear();
+ }
+ // released containers
+ synchronized(releasedContainerList) {
+ for (ContainerId cId : releasedContainerList) {
+ LOG.debug(MessageFormat.format("NodeManager {0} released container" +
+ " ({1}).", node.getNodeID(), cId));
+ csList.add(newContainerStatus(
+ cId, ContainerState.COMPLETE, ContainerExitStatus.ABORTED));
+ }
+ releasedContainerList.clear();
+ }
+ return csList;
+ }
+
+ private ContainerStatus newContainerStatus(ContainerId cId,
+ ContainerState state,
+ int exitState) {
+ ContainerStatus cs = Records.newRecord(ContainerStatus.class);
+ cs.setContainerId(cId);
+ cs.setState(state);
+ cs.setExitStatus(exitState);
+ return cs;
+ }
+
+ public RMNode getNode() {
+ return node;
+ }
+
+ /**
+ * launch a new container with the given life time
+ */
+ public void addNewContainer(Container container, long lifeTimeMS) {
+ LOG.debug(MessageFormat.format("NodeManager {0} launches a new " +
+ "container ({1}).", node.getNodeID(), container.getId()));
+ if (lifeTimeMS != -1) {
+ // normal container
+ ContainerSimulator cs = new ContainerSimulator(container.getId(),
+ container.getResource(), lifeTimeMS + System.currentTimeMillis(),
+ lifeTimeMS);
+ containerQueue.add(cs);
+ runningContainers.put(cs.getId(), cs);
+ } else {
+ // AM container
+ // -1 means AMContainer
+ synchronized(amContainerList) {
+ amContainerList.add(container.getId());
+ }
+ }
+ }
+
+ /**
+ * clean up an AM container and add to completed list
+ * @param containerId id of the container to be cleaned
+ */
+ public void cleanupContainer(ContainerId containerId) {
+ synchronized(amContainerList) {
+ amContainerList.remove(containerId);
+ }
+ synchronized(completedContainerList) {
+ completedContainerList.add(containerId);
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.nodemanager;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode
+ .UpdatedContainerInfo;
+
+public class NodeInfo {
+ private static int NODE_ID = 0;
+
+ public static NodeId newNodeID(String host, int port) {
+ return NodeId.newInstance(host, port);
+ }
+
+ private static class FakeRMNodeImpl implements RMNode {
+ private NodeId nodeId;
+ private String hostName;
+ private String nodeAddr;
+ private String httpAddress;
+ private int cmdPort;
+ private Resource perNode;
+ private String rackName;
+ private String healthReport;
+ private NodeState state;
+ private List<ContainerId> toCleanUpContainers;
+ private List<ApplicationId> toCleanUpApplications;
+
+ public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
+ Resource perNode, String rackName, String healthReport,
+ int cmdPort, String hostName, NodeState state) {
+ this.nodeId = nodeId;
+ this.nodeAddr = nodeAddr;
+ this.httpAddress = httpAddress;
+ this.perNode = perNode;
+ this.rackName = rackName;
+ this.healthReport = healthReport;
+ this.cmdPort = cmdPort;
+ this.hostName = hostName;
+ this.state = state;
+ toCleanUpApplications = new ArrayList<ApplicationId>();
+ toCleanUpContainers = new ArrayList<ContainerId>();
+ }
+
+ public NodeId getNodeID() {
+ return nodeId;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public int getCommandPort() {
+ return cmdPort;
+ }
+
+ public int getHttpPort() {
+ return 0;
+ }
+
+ public String getNodeAddress() {
+ return nodeAddr;
+ }
+
+ public String getHttpAddress() {
+ return httpAddress;
+ }
+
+ public String getHealthReport() {
+ return healthReport;
+ }
+
+ public long getLastHealthReportTime() {
+ return 0;
+ }
+
+ public Resource getTotalCapability() {
+ return perNode;
+ }
+
+ public String getRackName() {
+ return rackName;
+ }
+
+ public Node getNode() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ public NodeState getState() {
+ return state;
+ }
+
+ public List<ContainerId> getContainersToCleanUp() {
+ return toCleanUpContainers;
+ }
+
+ public List<ApplicationId> getAppsToCleanup() {
+ return toCleanUpApplications;
+ }
+
+ public void updateNodeHeartbeatResponseForCleanup(
+ NodeHeartbeatResponse response) {
+ }
+
+ public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
+ return null;
+ }
+
+ public List<UpdatedContainerInfo> pullContainerUpdates() {
+ ArrayList<UpdatedContainerInfo> list = new ArrayList<UpdatedContainerInfo>();
+
+ ArrayList<ContainerStatus> list2 = new ArrayList<ContainerStatus>();
+ for(ContainerId cId : this.toCleanUpContainers) {
+ list2.add(ContainerStatus.newInstance(cId, ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS));
+ }
+ list.add(new UpdatedContainerInfo(new ArrayList<ContainerStatus>(),
+ list2));
+ return list;
+ }
+ }
+
+ public static RMNode newNodeInfo(String rackName, String hostName,
+ final Resource resource, int port) {
+ final NodeId nodeId = newNodeID(hostName, port);
+ final String nodeAddr = hostName + ":" + port;
+ final String httpAddress = hostName;
+
+ return new FakeRMNodeImpl(nodeId, nodeAddr, httpAddress,
+ resource, rackName, "Me good",
+ port, hostName, null);
+ }
+
+ public static RMNode newNodeInfo(String rackName, String hostName,
+ final Resource resource) {
+ return newNodeInfo(rackName, hostName, resource, NODE_ID++);
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/CapacitySchedulerMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/CapacitySchedulerMetrics.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/CapacitySchedulerMetrics.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/CapacitySchedulerMetrics.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.scheduler;
+
+public class CapacitySchedulerMetrics extends SchedulerMetrics {
+
+ public CapacitySchedulerMetrics() {
+ super();
+ }
+
+ @Override
+ public void trackQueue(String queueName) {
+ trackedQueues.add(queueName);
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class ContainerSimulator implements Delayed {
+ // id
+ private ContainerId id;
+ // resource allocated
+ private Resource resource;
+ // end time
+ private long endTime;
+ // life time (ms)
+ private long lifeTime;
+ // host name
+ private String hostname;
+ // priority
+ private int priority;
+ // type
+ private String type;
+
+ /**
+ * invoked when AM schedules containers to allocate
+ */
+ public ContainerSimulator(Resource resource, long lifeTime,
+ String hostname, int priority, String type) {
+ this.resource = resource;
+ this.lifeTime = lifeTime;
+ this.hostname = hostname;
+ this.priority = priority;
+ this.type = type;
+ }
+
+ /**
+ * invoke when NM schedules containers to run
+ */
+ public ContainerSimulator(ContainerId id, Resource resource, long endTime,
+ long lifeTime) {
+ this.id = id;
+ this.resource = resource;
+ this.endTime = endTime;
+ this.lifeTime = lifeTime;
+ }
+
+ public Resource getResource() {
+ return resource;
+ }
+
+ public ContainerId getId() {
+ return id;
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ if (!(o instanceof ContainerSimulator)) {
+ throw new IllegalArgumentException(
+ "Parameter must be a ContainerSimulator instance");
+ }
+ ContainerSimulator other = (ContainerSimulator) o;
+ return (int) Math.signum(endTime - other.endTime);
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(endTime - System.currentTimeMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public long getLifeTime() {
+ return lifeTime;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setPriority(int p) {
+ priority = p;
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
+ .AppSchedulable;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
+ .FairScheduler;
+
+import com.codahale.metrics.Gauge;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+
+public class FairSchedulerMetrics extends SchedulerMetrics {
+
+ private int totalMemoryMB = Integer.MAX_VALUE;
+ private int totalVCores = Integer.MAX_VALUE;
+ private boolean maxReset = false;
+
+ public FairSchedulerMetrics() {
+ super();
+ appTrackedMetrics.add("demand.memory");
+ appTrackedMetrics.add("demand.vcores");
+ appTrackedMetrics.add("usage.memory");
+ appTrackedMetrics.add("usage.vcores");
+ appTrackedMetrics.add("minshare.memory");
+ appTrackedMetrics.add("minshare.vcores");
+ appTrackedMetrics.add("maxshare.memory");
+ appTrackedMetrics.add("maxshare.vcores");
+ appTrackedMetrics.add("fairshare.memory");
+ appTrackedMetrics.add("fairshare.vcores");
+ queueTrackedMetrics.add("demand.memory");
+ queueTrackedMetrics.add("demand.vcores");
+ queueTrackedMetrics.add("usage.memory");
+ queueTrackedMetrics.add("usage.vcores");
+ queueTrackedMetrics.add("minshare.memory");
+ queueTrackedMetrics.add("minshare.vcores");
+ queueTrackedMetrics.add("maxshare.memory");
+ queueTrackedMetrics.add("maxshare.vcores");
+ queueTrackedMetrics.add("fairshare.memory");
+ queueTrackedMetrics.add("fairshare.vcores");
+ }
+
+ @Override
+ public void trackApp(ApplicationAttemptId appAttemptId, String oldAppId) {
+ super.trackApp(appAttemptId, oldAppId);
+ FairScheduler fair = (FairScheduler) scheduler;
+ final AppSchedulable app = fair.getSchedulerApp(appAttemptId)
+ .getAppSchedulable();
+ metrics.register("variable.app." + oldAppId + ".demand.memory",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return app.getDemand().getMemory();
+ }
+ }
+ );
+ metrics.register("variable.app." + oldAppId + ".demand.vcores",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return app.getDemand().getVirtualCores();
+ }
+ }
+ );
+ metrics.register("variable.app." + oldAppId + ".usage.memory",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return app.getResourceUsage().getMemory();
+ }
+ }
+ );
+ metrics.register("variable.app." + oldAppId + ".usage.vcores",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return app.getResourceUsage().getVirtualCores();
+ }
+ }
+ );
+ metrics.register("variable.app." + oldAppId + ".minshare.memory",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return app.getMinShare().getMemory();
+ }
+ }
+ );
+ metrics.register("variable.app." + oldAppId + ".minshare.vcores",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return app.getMinShare().getMemory();
+ }
+ }
+ );
+ metrics.register("variable.app." + oldAppId + ".maxshare.memory",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return Math.min(app.getMaxShare().getMemory(), totalMemoryMB);
+ }
+ }
+ );
+ metrics.register("variable.app." + oldAppId + ".maxshare.vcores",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return Math.min(app.getMaxShare().getVirtualCores(), totalVCores);
+ }
+ }
+ );
+ metrics.register("variable.app." + oldAppId + ".fairshare.memory",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return app.getFairShare().getVirtualCores();
+ }
+ }
+ );
+ metrics.register("variable.app." + oldAppId + ".fairshare.vcores",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return app.getFairShare().getVirtualCores();
+ }
+ }
+ );
+ }
+
+ @Override
+ public void trackQueue(String queueName) {
+ trackedQueues.add(queueName);
+ FairScheduler fair = (FairScheduler) scheduler;
+ final FSQueue queue = fair.getQueueManager().getQueue(queueName);
+ metrics.register("variable.queue." + queueName + ".demand.memory",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return queue.getDemand().getMemory();
+ }
+ }
+ );
+ metrics.register("variable.queue." + queueName + ".demand.vcores",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return queue.getDemand().getVirtualCores();
+ }
+ }
+ );
+ metrics.register("variable.queue." + queueName + ".usage.memory",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return queue.getResourceUsage().getMemory();
+ }
+ }
+ );
+ metrics.register("variable.queue." + queueName + ".usage.vcores",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return queue.getResourceUsage().getVirtualCores();
+ }
+ }
+ );
+ metrics.register("variable.queue." + queueName + ".minshare.memory",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return queue.getMinShare().getMemory();
+ }
+ }
+ );
+ metrics.register("variable.queue." + queueName + ".minshare.vcores",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return queue.getMinShare().getVirtualCores();
+ }
+ }
+ );
+ metrics.register("variable.queue." + queueName + ".maxshare.memory",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ if (! maxReset &&
+ SLSRunner.simulateInfoMap.containsKey("Number of nodes") &&
+ SLSRunner.simulateInfoMap.containsKey("Node memory (MB)") &&
+ SLSRunner.simulateInfoMap.containsKey("Node VCores")) {
+ int numNMs = Integer.parseInt(
+ SLSRunner.simulateInfoMap.get("Number of nodes").toString());
+ int numMemoryMB = Integer.parseInt(
+ SLSRunner.simulateInfoMap.get("Node memory (MB)").toString());
+ int numVCores = Integer.parseInt(
+ SLSRunner.simulateInfoMap.get("Node VCores").toString());
+
+ totalMemoryMB = numNMs * numMemoryMB;
+ totalVCores = numNMs * numVCores;
+ maxReset = false;
+ }
+
+ return Math.min(queue.getMaxShare().getMemory(), totalMemoryMB);
+ }
+ }
+ );
+ metrics.register("variable.queue." + queueName + ".maxshare.vcores",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return Math.min(queue.getMaxShare().getVirtualCores(), totalVCores);
+ }
+ }
+ );
+ metrics.register("variable.queue." + queueName + ".fairshare.memory",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return queue.getFairShare().getMemory();
+ }
+ }
+ );
+ metrics.register("variable.queue." + queueName + ".fairshare.vcores",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return queue.getFairShare().getVirtualCores();
+ }
+ }
+ );
+ }
+
+ @Override
+ public void untrackQueue(String queueName) {
+ trackedQueues.remove(queueName);
+ metrics.remove("variable.queue." + queueName + ".demand.memory");
+ metrics.remove("variable.queue." + queueName + ".demand.vcores");
+ metrics.remove("variable.queue." + queueName + ".usage.memory");
+ metrics.remove("variable.queue." + queueName + ".usage.vcores");
+ metrics.remove("variable.queue." + queueName + ".minshare.memory");
+ metrics.remove("variable.queue." + queueName + ".minshare.vcores");
+ metrics.remove("variable.queue." + queueName + ".maxshare.memory");
+ metrics.remove("variable.queue." + queueName + ".maxshare.vcores");
+ metrics.remove("variable.queue." + queueName + ".fairshare.memory");
+ metrics.remove("variable.queue." + queueName + ".fairshare.vcores");
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FifoSchedulerMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FifoSchedulerMetrics.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FifoSchedulerMetrics.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FifoSchedulerMetrics.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
+ .FifoScheduler;
+
+import com.codahale.metrics.Gauge;
+
+public class FifoSchedulerMetrics extends SchedulerMetrics {
+
+ public FifoSchedulerMetrics() {
+ super();
+ }
+
+ @Override
+ public void trackQueue(String queueName) {
+ trackedQueues.add(queueName);
+ FifoScheduler fifo = (FifoScheduler) scheduler;
+ // for FifoScheduler, only DEFAULT_QUEUE
+ // here the three parameters doesn't affect results
+ final QueueInfo queue = fifo.getQueueInfo(queueName, false, false);
+ // track currentCapacity, maximumCapacity (always 1.0f)
+ metrics.register("variable.queue." + queueName + ".currentcapacity",
+ new Gauge<Float>() {
+ @Override
+ public Float getValue() {
+ return queue.getCurrentCapacity();
+ }
+ }
+ );
+ metrics.register("variable.queue." + queueName + ".",
+ new Gauge<Float>() {
+ @Override
+ public Float getValue() {
+ return queue.getCurrentCapacity();
+ }
+ }
+ );
+ }
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/NodeUpdateSchedulerEventWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/NodeUpdateSchedulerEventWrapper.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/NodeUpdateSchedulerEventWrapper.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/NodeUpdateSchedulerEventWrapper.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+ .NodeUpdateSchedulerEvent;
+
+public class NodeUpdateSchedulerEventWrapper extends NodeUpdateSchedulerEvent {
+
+ public NodeUpdateSchedulerEventWrapper(NodeUpdateSchedulerEvent event) {
+ super(new RMNodeWrapper(event.getRMNode()));
+ }
+
+}
Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode
+ .UpdatedContainerInfo;
+
+import java.util.Collections;
+import java.util.List;
+
+public class RMNodeWrapper implements RMNode {
+ private RMNode node;
+ private List<UpdatedContainerInfo> updates;
+ private boolean pulled = false;
+
+ public RMNodeWrapper(RMNode node) {
+ this.node = node;
+ updates = node.pullContainerUpdates();
+ }
+
+ @Override
+ public NodeId getNodeID() {
+ return node.getNodeID();
+ }
+
+ @Override
+ public String getHostName() {
+ return node.getHostName();
+ }
+
+ @Override
+ public int getCommandPort() {
+ return node.getCommandPort();
+ }
+
+ @Override
+ public int getHttpPort() {
+ return node.getHttpPort();
+ }
+
+ @Override
+ public String getNodeAddress() {
+ return node.getNodeAddress();
+ }
+
+ @Override
+ public String getHttpAddress() {
+ return node.getHttpAddress();
+ }
+
+ @Override
+ public String getHealthReport() {
+ return node.getHealthReport();
+ }
+
+ @Override
+ public long getLastHealthReportTime() {
+ return node.getLastHealthReportTime();
+ }
+
+ @Override
+ public Resource getTotalCapability() {
+ return node.getTotalCapability();
+ }
+
+ @Override
+ public String getRackName() {
+ return node.getRackName();
+ }
+
+ @Override
+ public Node getNode() {
+ return node.getNode();
+ }
+
+ @Override
+ public NodeState getState() {
+ return node.getState();
+ }
+
+ @Override
+ public List<ContainerId> getContainersToCleanUp() {
+ return node.getContainersToCleanUp();
+ }
+
+ @Override
+ public List<ApplicationId> getAppsToCleanup() {
+ return node.getAppsToCleanup();
+ }
+
+ @Override
+ public void updateNodeHeartbeatResponseForCleanup(
+ NodeHeartbeatResponse nodeHeartbeatResponse) {
+ node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse);
+ }
+
+ @Override
+ public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
+ return node.getLastNodeHeartBeatResponse();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public List<UpdatedContainerInfo> pullContainerUpdates() {
+ List<UpdatedContainerInfo> list = Collections.EMPTY_LIST;
+ if (! pulled) {
+ list = updates;
+ pulled = true;
+ }
+ return list;
+ }
+
+ List<UpdatedContainerInfo> getContainerUpdates() {
+ return updates;
+ }
+
+}