You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tu...@apache.org on 2013/09/27 22:23:21 UTC

svn commit: r1527059 [7/9] - in /hadoop/common/trunk: hadoop-assemblies/src/main/resources/assemblies/ hadoop-project/ hadoop-project/src/site/ hadoop-tools/ hadoop-tools/hadoop-openstack/ hadoop-tools/hadoop-sls/ hadoop-tools/hadoop-sls/dev-support/ h...

Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.appmaster;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.log4j.Logger;
+
+public class MRAMSimulator extends AMSimulator {
+  /*
+  Vocabulary Used: 
+  pending -> requests which are NOT yet sent to RM
+  scheduled -> requests which are sent to RM but not yet assigned
+  assigned -> requests which are assigned to a container
+  completed -> request corresponding to which container has completed
+  
+  Maps are scheduled as soon as their requests are received. Reduces are
+  scheduled when all maps have finished (not support slow-start currently).
+  */
+  
+  private static final int PRIORITY_REDUCE = 10;
+  private static final int PRIORITY_MAP = 20;
+  
+  // pending maps
+  private LinkedList<ContainerSimulator> pendingMaps =
+          new LinkedList<ContainerSimulator>();
+  
+  // pending failed maps
+  private LinkedList<ContainerSimulator> pendingFailedMaps =
+          new LinkedList<ContainerSimulator>();
+  
+  // scheduled maps
+  private LinkedList<ContainerSimulator> scheduledMaps =
+          new LinkedList<ContainerSimulator>();
+  
+  // assigned maps
+  private Map<ContainerId, ContainerSimulator> assignedMaps =
+          new HashMap<ContainerId, ContainerSimulator>();
+  
+  // reduces which are not yet scheduled
+  private LinkedList<ContainerSimulator> pendingReduces =
+          new LinkedList<ContainerSimulator>();
+  
+  // pending failed reduces
+  private LinkedList<ContainerSimulator> pendingFailedReduces =
+          new LinkedList<ContainerSimulator>();
+ 
+  // scheduled reduces
+  private LinkedList<ContainerSimulator> scheduledReduces =
+          new LinkedList<ContainerSimulator>();
+  
+  // assigned reduces
+  private Map<ContainerId, ContainerSimulator> assignedReduces =
+          new HashMap<ContainerId, ContainerSimulator>();
+  
+  // all maps & reduces
+  private LinkedList<ContainerSimulator> allMaps =
+          new LinkedList<ContainerSimulator>();
+  private LinkedList<ContainerSimulator> allReduces =
+          new LinkedList<ContainerSimulator>();
+
+  // counters
+  private int mapFinished = 0;
+  private int mapTotal = 0;
+  private int reduceFinished = 0;
+  private int reduceTotal = 0;
+  // waiting for AM container 
+  private boolean isAMContainerRunning = false;
+  private Container amContainer;
+  // finished
+  private boolean isFinished = false;
+  // resource for AM container
+  private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
+  private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1;
+
+  public final Logger LOG = Logger.getLogger(MRAMSimulator.class);
+
+  public void init(int id, int heartbeatInterval,
+      List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
+      long traceStartTime, long traceFinishTime, String user, String queue, 
+      boolean isTracked, String oldAppId) {
+    super.init(id, heartbeatInterval, containerList, rm, se, 
+              traceStartTime, traceFinishTime, user, queue,
+              isTracked, oldAppId);
+    amtype = "mapreduce";
+    
+    // get map/reduce tasks
+    for (ContainerSimulator cs : containerList) {
+      if (cs.getType().equals("map")) {
+        cs.setPriority(PRIORITY_MAP);
+        pendingMaps.add(cs);
+      } else if (cs.getType().equals("reduce")) {
+        cs.setPriority(PRIORITY_REDUCE);
+        pendingReduces.add(cs);
+      }
+    }
+    allMaps.addAll(pendingMaps);
+    allReduces.addAll(pendingReduces);
+    mapTotal = pendingMaps.size();
+    reduceTotal = pendingReduces.size();
+    totalContainers = mapTotal + reduceTotal;
+  }
+
+  @Override
+  public void firstStep()
+          throws YarnException, IOException, InterruptedException {
+    super.firstStep();
+    
+    requestAMContainer();
+  }
+
+  /**
+   * send out request for AM container
+   */
+  protected void requestAMContainer()
+          throws YarnException, IOException, InterruptedException {
+    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+    ResourceRequest amRequest = createResourceRequest(
+            BuilderUtils.newResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB,
+                    MR_AM_CONTAINER_RESOURCE_VCORES),
+            ResourceRequest.ANY, 1, 1);
+    ask.add(amRequest);
+    LOG.debug(MessageFormat.format("Application {0} sends out allocate " +
+            "request for its AM", appId));
+    final AllocateRequest request = this.createAllocateRequest(ask);
+
+    UserGroupInformation ugi =
+            UserGroupInformation.createRemoteUser(appAttemptId.toString());
+    Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
+            .get(appAttemptId.getApplicationId())
+            .getRMAppAttempt(appAttemptId).getAMRMToken();
+    ugi.addTokenIdentifier(token.decodeIdentifier());
+    AllocateResponse response = ugi.doAs(
+            new PrivilegedExceptionAction<AllocateResponse>() {
+      @Override
+      public AllocateResponse run() throws Exception {
+        return rm.getApplicationMasterService().allocate(request);
+      }
+    });
+
+    // waiting until the AM container is allocated
+    while (true) {
+      if (response != null && ! response.getAllocatedContainers().isEmpty()) {
+        // get AM container
+        Container container = response.getAllocatedContainers().get(0);
+        se.getNmMap().get(container.getNodeId())
+                .addNewContainer(container, -1L);
+        // start AM container
+        amContainer = container;
+        LOG.debug(MessageFormat.format("Application {0} starts its " +
+                "AM container ({1}).", appId, amContainer.getId()));
+        isAMContainerRunning = true;
+        break;
+      }
+      // this sleep time is different from HeartBeat
+      Thread.sleep(1000);
+      // send out empty request
+      sendContainerRequest();
+      response = responseQueue.take();
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected void processResponseQueue()
+          throws InterruptedException, YarnException, IOException {
+    while (! responseQueue.isEmpty()) {
+      AllocateResponse response = responseQueue.take();
+
+      // check completed containers
+      if (! response.getCompletedContainersStatuses().isEmpty()) {
+        for (ContainerStatus cs : response.getCompletedContainersStatuses()) {
+          ContainerId containerId = cs.getContainerId();
+          if (cs.getExitStatus() == ContainerExitStatus.SUCCESS) {
+            if (assignedMaps.containsKey(containerId)) {
+              LOG.debug(MessageFormat.format("Application {0} has one" +
+                      "mapper finished ({1}).", appId, containerId));
+              assignedMaps.remove(containerId);
+              mapFinished ++;
+              finishedContainers ++;
+            } else if (assignedReduces.containsKey(containerId)) {
+              LOG.debug(MessageFormat.format("Application {0} has one" +
+                      "reducer finished ({1}).", appId, containerId));
+              assignedReduces.remove(containerId);
+              reduceFinished ++;
+              finishedContainers ++;
+            } else {
+              // am container released event
+              isFinished = true;
+              LOG.info(MessageFormat.format("Application {0} goes to " +
+                      "finish.", appId));
+            }
+          } else {
+            // container to be killed
+            if (assignedMaps.containsKey(containerId)) {
+              LOG.debug(MessageFormat.format("Application {0} has one " +
+                      "mapper killed ({1}).", appId, containerId));
+              pendingFailedMaps.add(assignedMaps.remove(containerId));
+            } else if (assignedReduces.containsKey(containerId)) {
+              LOG.debug(MessageFormat.format("Application {0} has one " +
+                      "reducer killed ({1}).", appId, containerId));
+              pendingFailedReduces.add(assignedReduces.remove(containerId));
+            } else {
+              LOG.info(MessageFormat.format("Application {0}'s AM is " +
+                      "going to be killed. Restarting...", appId));
+              restart();
+            }
+          }
+        }
+      }
+      
+      // check finished
+      if (isAMContainerRunning &&
+              (mapFinished == mapTotal) &&
+              (reduceFinished == reduceTotal)) {
+        // to release the AM container
+        se.getNmMap().get(amContainer.getNodeId())
+                .cleanupContainer(amContainer.getId());
+        isAMContainerRunning = false;
+        LOG.debug(MessageFormat.format("Application {0} sends out event " +
+                "to clean up its AM container.", appId));
+        isFinished = true;
+      }
+
+      // check allocated containers
+      for (Container container : response.getAllocatedContainers()) {
+        if (! scheduledMaps.isEmpty()) {
+          ContainerSimulator cs = scheduledMaps.remove();
+          LOG.debug(MessageFormat.format("Application {0} starts a " +
+                  "launch a mapper ({1}).", appId, container.getId()));
+          assignedMaps.put(container.getId(), cs);
+          se.getNmMap().get(container.getNodeId())
+                  .addNewContainer(container, cs.getLifeTime());
+        } else if (! this.scheduledReduces.isEmpty()) {
+          ContainerSimulator cs = scheduledReduces.remove();
+          LOG.debug(MessageFormat.format("Application {0} starts a " +
+                  "launch a reducer ({1}).", appId, container.getId()));
+          assignedReduces.put(container.getId(), cs);
+          se.getNmMap().get(container.getNodeId())
+                  .addNewContainer(container, cs.getLifeTime());
+        }
+      }
+    }
+  }
+  
+  /**
+   * restart running because of the am container killed
+   */
+  private void restart()
+          throws YarnException, IOException, InterruptedException {
+    // clear 
+    finishedContainers = 0;
+    isFinished = false;
+    mapFinished = 0;
+    reduceFinished = 0;
+    pendingFailedMaps.clear();
+    pendingMaps.clear();
+    pendingReduces.clear();
+    pendingFailedReduces.clear();
+    pendingMaps.addAll(allMaps);
+    pendingReduces.addAll(pendingReduces);
+    isAMContainerRunning = false;
+    amContainer = null;
+    // resent am container request
+    requestAMContainer();
+  }
+
+  @Override
+  protected void sendContainerRequest()
+          throws YarnException, IOException, InterruptedException {
+    if (isFinished) {
+      return;
+    }
+
+    // send out request
+    List<ResourceRequest> ask = null;
+    if (isAMContainerRunning) {
+      if (mapFinished != mapTotal) {
+        // map phase
+        if (! pendingMaps.isEmpty()) {
+          ask = packageRequests(pendingMaps, PRIORITY_MAP);
+          LOG.debug(MessageFormat.format("Application {0} sends out " +
+                  "request for {1} mappers.", appId, pendingMaps.size()));
+          scheduledMaps.addAll(pendingMaps);
+          pendingMaps.clear();
+        } else if (! pendingFailedMaps.isEmpty() && scheduledMaps.isEmpty()) {
+          ask = packageRequests(pendingFailedMaps, PRIORITY_MAP);
+          LOG.debug(MessageFormat.format("Application {0} sends out " +
+                  "requests for {1} failed mappers.", appId,
+                  pendingFailedMaps.size()));
+          scheduledMaps.addAll(pendingFailedMaps);
+          pendingFailedMaps.clear();
+        }
+      } else if (reduceFinished != reduceTotal) {
+        // reduce phase
+        if (! pendingReduces.isEmpty()) {
+          ask = packageRequests(pendingReduces, PRIORITY_REDUCE);
+          LOG.debug(MessageFormat.format("Application {0} sends out " +
+                  "requests for {1} reducers.", appId, pendingReduces.size()));
+          scheduledReduces.addAll(pendingReduces);
+          pendingReduces.clear();
+        } else if (! pendingFailedReduces.isEmpty()
+                && scheduledReduces.isEmpty()) {
+          ask = packageRequests(pendingFailedReduces, PRIORITY_REDUCE);
+          LOG.debug(MessageFormat.format("Application {0} sends out " +
+                  "request for {1} failed reducers.", appId,
+                  pendingFailedReduces.size()));
+          scheduledReduces.addAll(pendingFailedReduces);
+          pendingFailedReduces.clear();
+        }
+      }
+    }
+    if (ask == null) {
+      ask = new ArrayList<ResourceRequest>();
+    }
+    
+    final AllocateRequest request = createAllocateRequest(ask);
+    if (totalContainers == 0) {
+      request.setProgress(1.0f);
+    } else {
+      request.setProgress((float) finishedContainers / totalContainers);
+    }
+
+    UserGroupInformation ugi =
+            UserGroupInformation.createRemoteUser(appAttemptId.toString());
+    Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
+            .get(appAttemptId.getApplicationId())
+            .getRMAppAttempt(appAttemptId).getAMRMToken();
+    ugi.addTokenIdentifier(token.decodeIdentifier());
+    AllocateResponse response = ugi.doAs(
+            new PrivilegedExceptionAction<AllocateResponse>() {
+      @Override
+      public AllocateResponse run() throws Exception {
+        return rm.getApplicationMasterService().allocate(request);
+      }
+    });
+    if (response != null) {
+      responseQueue.put(response);
+    }
+  }
+
+  @Override
+  protected void checkStop() {
+    if (isFinished) {
+      super.setEndTime(System.currentTimeMillis());
+    }
+  }
+
+  @Override
+  public void lastStep() {
+    super.lastStep();
+
+    // clear data structures
+    allMaps.clear();
+    allReduces.clear();
+    assignedMaps.clear();
+    assignedReduces.clear();
+    pendingFailedMaps.clear();
+    pendingFailedReduces.clear();
+    pendingMaps.clear();
+    pendingReduces.clear();
+    scheduledMaps.clear();
+    scheduledReduces.clear();
+    responseQueue.clear();
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.conf;
+
+public class SLSConfiguration {
+  // sls
+  public static final String PREFIX = "yarn.sls.";
+  // runner
+  public static final String RUNNER_PREFIX = PREFIX + "runner.";
+  public static final String RUNNER_POOL_SIZE = RUNNER_PREFIX + "pool.size";
+  public static final int RUNNER_POOL_SIZE_DEFAULT = 10;
+  // scheduler
+  public static final String SCHEDULER_PREFIX = PREFIX + "scheduler.";
+  public static final String RM_SCHEDULER = SCHEDULER_PREFIX + "class";
+  // metrics
+  public static final String METRICS_PREFIX = PREFIX + "metrics.";
+  public static final String METRICS_SWITCH = METRICS_PREFIX + "switch"; 
+  public static final String METRICS_WEB_ADDRESS_PORT = METRICS_PREFIX
+                                                  + "web.address.port";
+  public static final String METRICS_OUTPUT_DIR = METRICS_PREFIX + "output";
+  public static final int METRICS_WEB_ADDRESS_PORT_DEFAULT = 10001;
+  public static final String METRICS_TIMER_WINDOW_SIZE = METRICS_PREFIX
+                                                  + "timer.window.size";
+  public static final int METRICS_TIMER_WINDOW_SIZE_DEFAULT = 100;
+  public static final String METRICS_RECORD_INTERVAL_MS = METRICS_PREFIX
+                                                  + "record.interval.ms";
+  public static final int METRICS_RECORD_INTERVAL_MS_DEFAULT = 1000;
+  // nm
+  public static final String NM_PREFIX = PREFIX + "nm.";
+  public static final String NM_MEMORY_MB = NM_PREFIX + "memory.mb";
+  public static final int NM_MEMORY_MB_DEFAULT = 10240;
+  public static final String NM_VCORES = NM_PREFIX + "vcores";
+  public static final int NM_VCORES_DEFAULT = 10;
+  public static final String NM_HEARTBEAT_INTERVAL_MS = NM_PREFIX
+                                                  + "heartbeat.interval.ms";
+  public static final int NM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
+  // am
+  public static final String AM_PREFIX = PREFIX + "am.";
+  public static final String AM_HEARTBEAT_INTERVAL_MS = AM_PREFIX
+                                                  + "heartbeat.interval.ms";
+  public static final int AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
+  public static final String AM_TYPE = AM_PREFIX + "type.";
+
+  // container
+  public static final String CONTAINER_PREFIX = PREFIX + "container.";
+  public static final String CONTAINER_MEMORY_MB = CONTAINER_PREFIX
+          + "memory.mb";
+  public static final int CONTAINER_MEMORY_MB_DEFAULT = 1024;
+  public static final String CONTAINER_VCORES = CONTAINER_PREFIX + "vcores";
+  public static final int CONTAINER_VCORES_DEFAULT = 1;
+
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.nodemanager;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+        .RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+        .RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Logger;
+
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
+import org.apache.hadoop.yarn.sls.utils.SLSUtils;
+
+public class NMSimulator extends TaskRunner.Task {
+  // node resource
+  private RMNode node;
+  // master key
+  private MasterKey masterKey;
+  // containers with various STATE
+  private List<ContainerId> completedContainerList;
+  private List<ContainerId> releasedContainerList;
+  private DelayQueue<ContainerSimulator> containerQueue;
+  private Map<ContainerId, ContainerSimulator> runningContainers;
+  private List<ContainerId> amContainerList;
+  // resource manager
+  private ResourceManager rm;
+  // heart beat response id
+  private int RESPONSE_ID = 1;
+  private final static Logger LOG = Logger.getLogger(NMSimulator.class);
+  
+  public void init(String nodeIdStr, int memory, int cores,
+          int dispatchTime, int heartBeatInterval, ResourceManager rm)
+          throws IOException, YarnException {
+    super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval,
+            heartBeatInterval);
+    // create resource
+    String rackHostName[] = SLSUtils.getRackHostName(nodeIdStr);
+    this.node = NodeInfo.newNodeInfo(rackHostName[0], rackHostName[1], 
+                  BuilderUtils.newResource(memory, cores));
+    this.rm = rm;
+    // init data structures
+    completedContainerList =
+            Collections.synchronizedList(new ArrayList<ContainerId>());
+    releasedContainerList =
+            Collections.synchronizedList(new ArrayList<ContainerId>());
+    containerQueue = new DelayQueue<ContainerSimulator>();
+    amContainerList =
+            Collections.synchronizedList(new ArrayList<ContainerId>());
+    runningContainers =
+            new ConcurrentHashMap<ContainerId, ContainerSimulator>();
+    // register NM with RM
+    RegisterNodeManagerRequest req =
+            Records.newRecord(RegisterNodeManagerRequest.class);
+    req.setNodeId(node.getNodeID());
+    req.setResource(node.getTotalCapability());
+    req.setHttpPort(80);
+    RegisterNodeManagerResponse response = rm.getResourceTrackerService()
+            .registerNodeManager(req);
+    masterKey = response.getNMTokenMasterKey();
+  }
+
+  @Override
+  public void firstStep() throws YarnException, IOException {
+    // do nothing
+  }
+
+  @Override
+  public void middleStep() {
+    // we check the lifetime for each running containers
+    ContainerSimulator cs = null;
+    synchronized(completedContainerList) {
+      while ((cs = containerQueue.poll()) != null) {
+        runningContainers.remove(cs.getId());
+        completedContainerList.add(cs.getId());
+        LOG.debug(MessageFormat.format("Container {0} has completed",
+                cs.getId()));
+      }
+    }
+    
+    // send heart beat
+    NodeHeartbeatRequest beatRequest =
+            Records.newRecord(NodeHeartbeatRequest.class);
+    beatRequest.setLastKnownNMTokenMasterKey(masterKey);
+    NodeStatus ns = Records.newRecord(NodeStatus.class);
+    
+    ns.setContainersStatuses(generateContainerStatusList());
+    ns.setNodeId(node.getNodeID());
+    ns.setKeepAliveApplications(new ArrayList<ApplicationId>());
+    ns.setResponseId(RESPONSE_ID ++);
+    ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0));
+    beatRequest.setNodeStatus(ns);
+    try {
+      NodeHeartbeatResponse beatResponse =
+              rm.getResourceTrackerService().nodeHeartbeat(beatRequest);
+      if (! beatResponse.getContainersToCleanup().isEmpty()) {
+        // remove from queue
+        synchronized(releasedContainerList) {
+          for (ContainerId containerId : beatResponse.getContainersToCleanup()){
+            if (amContainerList.contains(containerId)) {
+              // AM container (not killed?, only release)
+              synchronized(amContainerList) {
+                amContainerList.remove(containerId);
+              }
+              LOG.debug(MessageFormat.format("NodeManager {0} releases " +
+                      "an AM ({1}).", node.getNodeID(), containerId));
+            } else {
+              cs = runningContainers.remove(containerId);
+              containerQueue.remove(cs);
+              releasedContainerList.add(containerId);
+              LOG.debug(MessageFormat.format("NodeManager {0} releases a " +
+                      "container ({1}).", node.getNodeID(), containerId));
+            }
+          }
+        }
+      }
+      if (beatResponse.getNodeAction() == NodeAction.SHUTDOWN) {
+        lastStep();
+      }
+    } catch (YarnException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void lastStep() {
+    // do nothing
+  }
+
+  /**
+   * catch status of all containers located on current node
+   */
+  private ArrayList<ContainerStatus> generateContainerStatusList() {
+    ArrayList<ContainerStatus> csList = new ArrayList<ContainerStatus>();
+    // add running containers
+    for (ContainerSimulator container : runningContainers.values()) {
+      csList.add(newContainerStatus(container.getId(),
+        ContainerState.RUNNING, ContainerExitStatus.SUCCESS));
+    }
+    synchronized(amContainerList) {
+      for (ContainerId cId : amContainerList) {
+        csList.add(newContainerStatus(cId,
+            ContainerState.RUNNING, ContainerExitStatus.SUCCESS));
+      }
+    }
+    // add complete containers
+    synchronized(completedContainerList) {
+      for (ContainerId cId : completedContainerList) {
+        LOG.debug(MessageFormat.format("NodeManager {0} completed" +
+                " container ({1}).", node.getNodeID(), cId));
+        csList.add(newContainerStatus(
+                cId, ContainerState.COMPLETE, ContainerExitStatus.SUCCESS));
+      }
+      completedContainerList.clear();
+    }
+    // released containers
+    synchronized(releasedContainerList) {
+      for (ContainerId cId : releasedContainerList) {
+        LOG.debug(MessageFormat.format("NodeManager {0} released container" +
+                " ({1}).", node.getNodeID(), cId));
+        csList.add(newContainerStatus(
+                cId, ContainerState.COMPLETE, ContainerExitStatus.ABORTED));
+      }
+      releasedContainerList.clear();
+    }
+    return csList;
+  }
+
+  private ContainerStatus newContainerStatus(ContainerId cId, 
+                                             ContainerState state,
+                                             int exitState) {
+    ContainerStatus cs = Records.newRecord(ContainerStatus.class);
+    cs.setContainerId(cId);
+    cs.setState(state);
+    cs.setExitStatus(exitState);
+    return cs;
+  }
+
+  public RMNode getNode() {
+    return node;
+  }
+
+  /**
+   * launch a new container with the given life time
+   */
+  public void addNewContainer(Container container, long lifeTimeMS) {
+    LOG.debug(MessageFormat.format("NodeManager {0} launches a new " +
+            "container ({1}).", node.getNodeID(), container.getId()));
+    if (lifeTimeMS != -1) {
+      // normal container
+      ContainerSimulator cs = new ContainerSimulator(container.getId(),
+              container.getResource(), lifeTimeMS + System.currentTimeMillis(),
+              lifeTimeMS);
+      containerQueue.add(cs);
+      runningContainers.put(cs.getId(), cs);
+    } else {
+      // AM container
+      // -1 means AMContainer
+      synchronized(amContainerList) {
+        amContainerList.add(container.getId());
+      }
+    }
+  }
+
+  /**
+   * clean up an AM container and add to completed list
+   * @param containerId id of the container to be cleaned
+   */
+  public void cleanupContainer(ContainerId containerId) {
+    synchronized(amContainerList) {
+      amContainerList.remove(containerId);
+    }
+    synchronized(completedContainerList) {
+      completedContainerList.add(containerId);
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.nodemanager;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode
+        .UpdatedContainerInfo;
+
+public class NodeInfo {
+  private static int NODE_ID = 0;
+
+  public static NodeId newNodeID(String host, int port) {
+    return NodeId.newInstance(host, port);
+  }
+
+  private static class FakeRMNodeImpl implements RMNode {
+    private NodeId nodeId;
+    private String hostName;
+    private String nodeAddr;
+    private String httpAddress;
+    private int cmdPort;
+    private Resource perNode;
+    private String rackName;
+    private String healthReport;
+    private NodeState state;
+    private List<ContainerId> toCleanUpContainers;
+    private List<ApplicationId> toCleanUpApplications;
+    
+    public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
+        Resource perNode, String rackName, String healthReport,
+        int cmdPort, String hostName, NodeState state) {
+      this.nodeId = nodeId;
+      this.nodeAddr = nodeAddr;
+      this.httpAddress = httpAddress;
+      this.perNode = perNode;
+      this.rackName = rackName;
+      this.healthReport = healthReport;
+      this.cmdPort = cmdPort;
+      this.hostName = hostName;
+      this.state = state;
+      toCleanUpApplications = new ArrayList<ApplicationId>();
+      toCleanUpContainers = new ArrayList<ContainerId>();
+    }
+
+    public NodeId getNodeID() {
+      return nodeId;
+    }
+    
+    public String getHostName() {
+      return hostName;
+    }
+    
+    public int getCommandPort() {
+      return cmdPort;
+    }
+    
+    public int getHttpPort() {
+      return 0;
+    }
+
+    public String getNodeAddress() {
+      return nodeAddr;
+    }
+
+    public String getHttpAddress() {
+      return httpAddress;
+    }
+
+    public String getHealthReport() {
+      return healthReport;
+    }
+
+    public long getLastHealthReportTime() {
+      return 0; 
+    }
+
+    public Resource getTotalCapability() {
+      return perNode;
+    }
+
+    public String getRackName() {
+      return rackName;
+    }
+
+    public Node getNode() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    public NodeState getState() {
+      return state;
+    }
+
+    public List<ContainerId> getContainersToCleanUp() {
+      return toCleanUpContainers;
+    }
+
+    public List<ApplicationId> getAppsToCleanup() {
+      return toCleanUpApplications;
+    }
+
+    public void updateNodeHeartbeatResponseForCleanup(
+            NodeHeartbeatResponse response) {
+    }
+
+    public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
+      return null;
+    }
+
+    public List<UpdatedContainerInfo> pullContainerUpdates() {
+      ArrayList<UpdatedContainerInfo> list = new ArrayList<UpdatedContainerInfo>();
+      
+      ArrayList<ContainerStatus> list2 = new ArrayList<ContainerStatus>();
+      for(ContainerId cId : this.toCleanUpContainers) {
+        list2.add(ContainerStatus.newInstance(cId, ContainerState.RUNNING, "", 
+          ContainerExitStatus.SUCCESS));
+      }
+      list.add(new UpdatedContainerInfo(new ArrayList<ContainerStatus>(), 
+        list2));
+      return list;
+    }
+  }
+  
+  public static RMNode newNodeInfo(String rackName, String hostName,
+                              final Resource resource, int port) {
+    final NodeId nodeId = newNodeID(hostName, port);
+    final String nodeAddr = hostName + ":" + port;
+    final String httpAddress = hostName;
+    
+    return new FakeRMNodeImpl(nodeId, nodeAddr, httpAddress,
+        resource, rackName, "Me good",
+        port, hostName, null);
+  }
+  
+  public static RMNode newNodeInfo(String rackName, String hostName,
+                              final Resource resource) {
+    return newNodeInfo(rackName, hostName, resource, NODE_ID++);
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/CapacitySchedulerMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/CapacitySchedulerMetrics.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/CapacitySchedulerMetrics.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/CapacitySchedulerMetrics.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.scheduler;
+
+public class CapacitySchedulerMetrics extends SchedulerMetrics {
+
+  public CapacitySchedulerMetrics() {
+    super();
+  }
+
+  @Override
+  public void trackQueue(String queueName) {
+    trackedQueues.add(queueName);
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class ContainerSimulator implements Delayed {
+  // id
+  private ContainerId id;
+  // resource allocated
+  private Resource resource;
+  // end time
+  private long endTime;
+  // life time (ms)
+  private long lifeTime;
+  // host name
+  private String hostname;
+  // priority
+  private int priority;
+  // type 
+  private String type;
+
+  /**
+   * invoked when AM schedules containers to allocate
+   */
+  public ContainerSimulator(Resource resource, long lifeTime,
+      String hostname, int priority, String type) {
+    this.resource = resource;
+    this.lifeTime = lifeTime;
+    this.hostname = hostname;
+    this.priority = priority;
+    this.type = type;
+  }
+
+  /**
+   * invoke when NM schedules containers to run
+   */
+  public ContainerSimulator(ContainerId id, Resource resource, long endTime,
+      long lifeTime) {
+    this.id = id;
+    this.resource = resource;
+    this.endTime = endTime;
+    this.lifeTime = lifeTime;
+  }
+  
+  public Resource getResource() {
+    return resource;
+  }
+  
+  public ContainerId getId() {
+    return id;
+  }
+
+  @Override
+  public int compareTo(Delayed o) {
+    if (!(o instanceof ContainerSimulator)) {
+      throw new IllegalArgumentException(
+              "Parameter must be a ContainerSimulator instance");
+    }
+    ContainerSimulator other = (ContainerSimulator) o;
+    return (int) Math.signum(endTime - other.endTime);
+  }
+
+  @Override
+  public long getDelay(TimeUnit unit) {
+    return unit.convert(endTime - System.currentTimeMillis(),
+          TimeUnit.MILLISECONDS);
+  }
+  
+  public long getLifeTime() {
+    return lifeTime;
+  }
+  
+  public String getHostname() {
+    return hostname;
+  }
+  
+  public long getEndTime() {
+    return endTime;
+  }
+  
+  public int getPriority() {
+    return priority;
+  }
+  
+  public String getType() {
+    return type;
+  }
+  
+  public void setPriority(int p) {
+    priority = p;
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
+        .AppSchedulable;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
+        .FairScheduler;
+
+import com.codahale.metrics.Gauge;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+
+public class FairSchedulerMetrics extends SchedulerMetrics {
+
+  private int totalMemoryMB = Integer.MAX_VALUE;
+  private int totalVCores = Integer.MAX_VALUE;
+  private boolean maxReset = false;
+
+  public FairSchedulerMetrics() {
+    super();
+    appTrackedMetrics.add("demand.memory");
+    appTrackedMetrics.add("demand.vcores");
+    appTrackedMetrics.add("usage.memory");
+    appTrackedMetrics.add("usage.vcores");
+    appTrackedMetrics.add("minshare.memory");
+    appTrackedMetrics.add("minshare.vcores");
+    appTrackedMetrics.add("maxshare.memory");
+    appTrackedMetrics.add("maxshare.vcores");
+    appTrackedMetrics.add("fairshare.memory");
+    appTrackedMetrics.add("fairshare.vcores");
+    queueTrackedMetrics.add("demand.memory");
+    queueTrackedMetrics.add("demand.vcores");
+    queueTrackedMetrics.add("usage.memory");
+    queueTrackedMetrics.add("usage.vcores");
+    queueTrackedMetrics.add("minshare.memory");
+    queueTrackedMetrics.add("minshare.vcores");
+    queueTrackedMetrics.add("maxshare.memory");
+    queueTrackedMetrics.add("maxshare.vcores");
+    queueTrackedMetrics.add("fairshare.memory");
+    queueTrackedMetrics.add("fairshare.vcores");
+  }
+  
+  @Override
+  public void trackApp(ApplicationAttemptId appAttemptId, String oldAppId) {
+    super.trackApp(appAttemptId, oldAppId);
+    FairScheduler fair = (FairScheduler) scheduler;
+    final AppSchedulable app = fair.getSchedulerApp(appAttemptId)
+            .getAppSchedulable();
+    metrics.register("variable.app." + oldAppId + ".demand.memory",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return app.getDemand().getMemory();
+        }
+      }
+    );
+    metrics.register("variable.app." + oldAppId + ".demand.vcores",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return app.getDemand().getVirtualCores();
+        }
+      }
+    );
+    metrics.register("variable.app." + oldAppId + ".usage.memory",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return app.getResourceUsage().getMemory();
+        }
+      }
+    );
+    metrics.register("variable.app." + oldAppId + ".usage.vcores",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return app.getResourceUsage().getVirtualCores();
+        }
+      }
+    );
+    metrics.register("variable.app." + oldAppId + ".minshare.memory",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return app.getMinShare().getMemory();
+        }
+      }
+    );
+    metrics.register("variable.app." + oldAppId + ".minshare.vcores",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return app.getMinShare().getMemory();
+        }
+      }
+    );
+    metrics.register("variable.app." + oldAppId + ".maxshare.memory",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return Math.min(app.getMaxShare().getMemory(), totalMemoryMB);
+        }
+      }
+    );
+    metrics.register("variable.app." + oldAppId + ".maxshare.vcores",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return Math.min(app.getMaxShare().getVirtualCores(), totalVCores);
+        }
+      }
+    );
+    metrics.register("variable.app." + oldAppId + ".fairshare.memory",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return app.getFairShare().getVirtualCores();
+        }
+      }
+    );
+    metrics.register("variable.app." + oldAppId + ".fairshare.vcores",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return app.getFairShare().getVirtualCores();
+        }
+      }
+    );
+  }
+
+  @Override
+  public void trackQueue(String queueName) {
+    trackedQueues.add(queueName);
+    FairScheduler fair = (FairScheduler) scheduler;
+    final FSQueue queue = fair.getQueueManager().getQueue(queueName);
+    metrics.register("variable.queue." + queueName + ".demand.memory",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return queue.getDemand().getMemory();
+        }
+      }
+    );
+    metrics.register("variable.queue." + queueName + ".demand.vcores",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return queue.getDemand().getVirtualCores();
+        }
+      }
+    );
+    metrics.register("variable.queue." + queueName + ".usage.memory",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return queue.getResourceUsage().getMemory();
+        }
+      }
+    );
+    metrics.register("variable.queue." + queueName + ".usage.vcores",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return queue.getResourceUsage().getVirtualCores();
+        }
+      }
+    );
+    metrics.register("variable.queue." + queueName + ".minshare.memory",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return queue.getMinShare().getMemory();
+        }
+      }
+    );
+    metrics.register("variable.queue." + queueName + ".minshare.vcores",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return queue.getMinShare().getVirtualCores();
+        }
+      }
+    );
+    metrics.register("variable.queue." + queueName + ".maxshare.memory",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          if (! maxReset &&
+                  SLSRunner.simulateInfoMap.containsKey("Number of nodes") &&
+                  SLSRunner.simulateInfoMap.containsKey("Node memory (MB)") &&
+                  SLSRunner.simulateInfoMap.containsKey("Node VCores")) {
+            int numNMs = Integer.parseInt(
+                  SLSRunner.simulateInfoMap.get("Number of nodes").toString());
+            int numMemoryMB = Integer.parseInt(
+                  SLSRunner.simulateInfoMap.get("Node memory (MB)").toString());
+            int numVCores = Integer.parseInt(
+                  SLSRunner.simulateInfoMap.get("Node VCores").toString());
+
+            totalMemoryMB = numNMs * numMemoryMB;
+            totalVCores = numNMs * numVCores;
+            maxReset = false;
+          }
+
+          return Math.min(queue.getMaxShare().getMemory(), totalMemoryMB);
+        }
+      }
+    );
+    metrics.register("variable.queue." + queueName + ".maxshare.vcores",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return Math.min(queue.getMaxShare().getVirtualCores(), totalVCores);
+        }
+      }
+    );
+    metrics.register("variable.queue." + queueName + ".fairshare.memory",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return queue.getFairShare().getMemory();
+        }
+      }
+    );
+    metrics.register("variable.queue." + queueName + ".fairshare.vcores",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          return queue.getFairShare().getVirtualCores();
+        }
+      }
+    );
+  }
+
+  @Override
+  public void untrackQueue(String queueName) {
+    trackedQueues.remove(queueName);
+    metrics.remove("variable.queue." + queueName + ".demand.memory");
+    metrics.remove("variable.queue." + queueName + ".demand.vcores");
+    metrics.remove("variable.queue." + queueName + ".usage.memory");
+    metrics.remove("variable.queue." + queueName + ".usage.vcores");
+    metrics.remove("variable.queue." + queueName + ".minshare.memory");
+    metrics.remove("variable.queue." + queueName + ".minshare.vcores");
+    metrics.remove("variable.queue." + queueName + ".maxshare.memory");
+    metrics.remove("variable.queue." + queueName + ".maxshare.vcores");
+    metrics.remove("variable.queue." + queueName + ".fairshare.memory");
+    metrics.remove("variable.queue." + queueName + ".fairshare.vcores");
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FifoSchedulerMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FifoSchedulerMetrics.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FifoSchedulerMetrics.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FifoSchedulerMetrics.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
+        .FifoScheduler;
+
+import com.codahale.metrics.Gauge;
+
+public class FifoSchedulerMetrics extends SchedulerMetrics {
+  
+  public FifoSchedulerMetrics() {
+    super();
+  }
+
+  @Override
+  public void trackQueue(String queueName) {
+    trackedQueues.add(queueName);
+    FifoScheduler fifo = (FifoScheduler) scheduler;
+    // for FifoScheduler, only DEFAULT_QUEUE
+    // here the three parameters doesn't affect results
+    final QueueInfo queue = fifo.getQueueInfo(queueName, false, false);
+    // track currentCapacity, maximumCapacity (always 1.0f)
+    metrics.register("variable.queue." + queueName + ".currentcapacity",
+      new Gauge<Float>() {
+        @Override
+        public Float getValue() {
+          return queue.getCurrentCapacity();
+        }
+      }
+    );
+    metrics.register("variable.queue." + queueName + ".",
+      new Gauge<Float>() {
+        @Override
+        public Float getValue() {
+          return queue.getCurrentCapacity();
+        }
+      }
+    );
+  }
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/NodeUpdateSchedulerEventWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/NodeUpdateSchedulerEventWrapper.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/NodeUpdateSchedulerEventWrapper.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/NodeUpdateSchedulerEventWrapper.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+        .NodeUpdateSchedulerEvent;
+
+public class NodeUpdateSchedulerEventWrapper extends NodeUpdateSchedulerEvent {
+  
+  public NodeUpdateSchedulerEventWrapper(NodeUpdateSchedulerEvent event) {
+    super(new RMNodeWrapper(event.getRMNode()));
+  }
+  
+}

Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java?rev=1527059&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java Fri Sep 27 20:23:19 2013
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode
+        .UpdatedContainerInfo;
+
+import java.util.Collections;
+import java.util.List;
+
+public class RMNodeWrapper implements RMNode {
+  private RMNode node;
+  private List<UpdatedContainerInfo> updates;
+  private boolean pulled = false;
+  
+  public RMNodeWrapper(RMNode node) {
+    this.node = node;
+    updates = node.pullContainerUpdates();
+  }
+  
+  @Override
+  public NodeId getNodeID() {
+    return node.getNodeID();
+  }
+
+  @Override
+  public String getHostName() {
+    return node.getHostName();
+  }
+
+  @Override
+  public int getCommandPort() {
+    return node.getCommandPort();
+  }
+
+  @Override
+  public int getHttpPort() {
+    return node.getHttpPort();
+  }
+
+  @Override
+  public String getNodeAddress() {
+    return node.getNodeAddress();
+  }
+
+  @Override
+  public String getHttpAddress() {
+    return node.getHttpAddress();
+  }
+
+  @Override
+  public String getHealthReport() {
+    return node.getHealthReport();
+  }
+
+  @Override
+  public long getLastHealthReportTime() {
+    return node.getLastHealthReportTime();
+  }
+
+  @Override
+  public Resource getTotalCapability() {
+    return node.getTotalCapability();
+  }
+
+  @Override
+  public String getRackName() {
+    return node.getRackName();
+  }
+
+  @Override
+  public Node getNode() {
+    return node.getNode();
+  }
+
+  @Override
+  public NodeState getState() {
+    return node.getState();
+  }
+
+  @Override
+  public List<ContainerId> getContainersToCleanUp() {
+    return node.getContainersToCleanUp();
+  }
+
+  @Override
+  public List<ApplicationId> getAppsToCleanup() {
+    return node.getAppsToCleanup();
+  }
+
+  @Override
+  public void updateNodeHeartbeatResponseForCleanup(
+          NodeHeartbeatResponse nodeHeartbeatResponse) {
+    node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse);
+  }
+
+  @Override
+  public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
+    return node.getLastNodeHeartBeatResponse();
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public List<UpdatedContainerInfo> pullContainerUpdates() {
+    List<UpdatedContainerInfo> list = Collections.EMPTY_LIST;
+    if (! pulled) {
+      list = updates;
+      pulled = true;
+    }
+    return list;    
+  }
+  
+  List<UpdatedContainerInfo> getContainerUpdates() {
+    return updates;
+  }
+
+}