You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2013/12/19 01:08:00 UTC

svn commit: r1552173 - in /hadoop/common/trunk/hadoop-tools/hadoop-sls/src: main/java/org/apache/hadoop/yarn/sls/ main/java/org/apache/hadoop/yarn/sls/appmaster/ main/java/org/apache/hadoop/yarn/sls/scheduler/ main/java/org/apache/hadoop/yarn/sls/web/ ...

Author: cdouglas
Date: Thu Dec 19 00:07:59 2013
New Revision: 1552173

URL: http://svn.apache.org/r1552173
Log:
YARN-1471. Preserve scheduler typeinfo in simulator to work with resource monitors.
Contributed by Carlo Curino.


Added:
    hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java   (with props)
    hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java   (with props)
    hadoop/common/trunk/hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml   (with props)
Modified:
    hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
    hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
    hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
    hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java
    hadoop/common/trunk/hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml

Modified: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java?rev=1552173&r1=1552172&r2=1552173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java (original)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java Thu Dec 19 00:07:59 2013
@@ -40,12 +40,15 @@ import org.apache.hadoop.tools.rumen.Log
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
 import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
 import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
 import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper;
+import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
 import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
+import  org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -140,9 +143,9 @@ public class SLSRunner {
     // start application masters
     startAM();
     // set queue & tracked apps information
-    ((ResourceSchedulerWrapper) rm.getResourceScheduler())
+    ((SchedulerWrapper) rm.getResourceScheduler())
                             .setQueueSet(this.queueAppNumMap.keySet());
-    ((ResourceSchedulerWrapper) rm.getResourceScheduler())
+    ((SchedulerWrapper) rm.getResourceScheduler())
                             .setTrackedAppSet(this.trackedApps);
     // print out simulation info
     printSimulationInfo();
@@ -151,13 +154,24 @@ public class SLSRunner {
     // starting the runner once everything is ready to go,
     runner.start();
   }
-  
+
   private void startRM() throws IOException, ClassNotFoundException {
     Configuration rmConf = new YarnConfiguration();
     String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
-    rmConf.set(SLSConfiguration.RM_SCHEDULER, schedulerClass);
-    rmConf.set(YarnConfiguration.RM_SCHEDULER,
-            ResourceSchedulerWrapper.class.getName());
+
+    // For CapacityScheduler we use a sub-classing instead of wrapping
+    // to allow scheduler-specific invocations from monitors to work
+    // this can be used for other schedulers as well if we care to
+    // exercise/track behaviors that are not common to the scheduler api
+    if(Class.forName(schedulerClass) == CapacityScheduler.class) {
+      rmConf.set(YarnConfiguration.RM_SCHEDULER,
+          SLSCapacityScheduler.class.getName());
+    } else {
+      rmConf.set(YarnConfiguration.RM_SCHEDULER,
+              ResourceSchedulerWrapper.class.getName());
+      rmConf.set(SLSConfiguration.RM_SCHEDULER, schedulerClass);
+    }
+
     rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);
     rm = new ResourceManager();
     rm.init(rmConf);

Modified: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java?rev=1552173&r1=1552172&r2=1552173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java (original)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java Thu Dec 19 00:07:59 2013
@@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.util.Recor
 import org.apache.log4j.Logger;
 
 import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
-import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper;
+import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
 import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
 import org.apache.hadoop.yarn.sls.utils.SLSUtils;
@@ -193,7 +193,7 @@ public abstract class AMSimulator extend
     simulateFinishTimeMS = System.currentTimeMillis() -
         SLSRunner.getRunner().getStartTimeMS();
     // record job running information
-    ((ResourceSchedulerWrapper)rm.getResourceScheduler())
+    ((SchedulerWrapper)rm.getResourceScheduler())
          .addAMRuntime(appId, 
                       traceStartTimeMS, traceFinishTimeMS, 
                       simulateStartTimeMS, simulateFinishTimeMS);
@@ -314,13 +314,13 @@ public abstract class AMSimulator extend
 
   private void trackApp() {
     if (isTracked) {
-      ((ResourceSchedulerWrapper) rm.getResourceScheduler())
+      ((SchedulerWrapper) rm.getResourceScheduler())
               .addTrackedApp(appAttemptId, oldAppId);
     }
   }
   public void untrackApp() {
     if (isTracked) {
-      ((ResourceSchedulerWrapper) rm.getResourceScheduler())
+      ((SchedulerWrapper) rm.getResourceScheduler())
               .removeTrackedApp(appAttemptId, oldAppId);
     }
   }

Modified: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java?rev=1552173&r1=1552172&r2=1552173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java (original)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java Thu Dec 19 00:07:59 2013
@@ -85,8 +85,8 @@ import com.codahale.metrics.MetricRegist
 import com.codahale.metrics.SlidingWindowReservoir;
 import com.codahale.metrics.Timer;
 
-public class ResourceSchedulerWrapper implements ResourceScheduler,
-        Configurable {
+public class ResourceSchedulerWrapper implements
+        SchedulerWrapper,ResourceScheduler,Configurable {
   private static final String EOL = System.getProperty("line.separator");
   private static final int SAMPLING_SIZE = 60;
   private ScheduledExecutorService pool;
@@ -150,9 +150,8 @@ public class ResourceSchedulerWrapper im
   public void setConf(Configuration conf) {
     this.conf = conf;
     // set scheduler
-    Class<? extends ResourceScheduler> klass =
-            conf.getClass(SLSConfiguration.RM_SCHEDULER, null,
-                    ResourceScheduler.class);
+    Class<? extends ResourceScheduler> klass = conf.getClass(
+        SLSConfiguration.RM_SCHEDULER, null, ResourceScheduler.class);
 
     scheduler = ReflectionUtils.newInstance(klass, conf);
     // start metrics
@@ -861,4 +860,3 @@ public class ResourceSchedulerWrapper im
     return scheduler.getAppsInQueue(queue);
   }
 }
-

Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java?rev=1552173&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java Thu Dec 19 00:07:59 2013
@@ -0,0 +1,808 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.web.SLSWebApp;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.CsvReporter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Timer;
+
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode
+        .UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+        .ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+        .SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+        .SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+        .CapacityScheduler;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+        .NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+        .SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+        .SchedulerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
+        .FairScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
+        .FifoScheduler;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class SLSCapacityScheduler extends CapacityScheduler implements
+        SchedulerWrapper,Configurable {
+  private static final String EOL = System.getProperty("line.separator");
+  private static final int SAMPLING_SIZE = 60;
+  private ScheduledExecutorService pool;
+  // counters for scheduler allocate/handle operations
+  private Counter schedulerAllocateCounter;
+  private Counter schedulerHandleCounter;
+  private Map<SchedulerEventType, Counter> schedulerHandleCounterMap;
+  // Timers for scheduler allocate/handle operations
+  private Timer schedulerAllocateTimer;
+  private Timer schedulerHandleTimer;
+  private Map<SchedulerEventType, Timer> schedulerHandleTimerMap;
+  private List<Histogram> schedulerHistogramList;
+  private Map<Histogram, Timer> histogramTimerMap;
+  private Lock samplerLock;
+  private Lock queueLock;
+
+  private Configuration conf;
+ 
+  private Map<ApplicationAttemptId, String> appQueueMap =
+          new ConcurrentHashMap<ApplicationAttemptId, String>();
+  private BufferedWriter jobRuntimeLogBW;
+
+  // Priority of the ResourceSchedulerWrapper shutdown hook.
+  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+  // web app
+  private SLSWebApp web;
+
+  private Map<ContainerId, Resource> preemptionContainerMap =
+          new ConcurrentHashMap<ContainerId, Resource>();
+
+  // metrics
+  private MetricRegistry metrics;
+  private SchedulerMetrics schedulerMetrics;
+  private boolean metricsON;
+  private String metricsOutputDir;
+  private BufferedWriter metricsLogBW;
+  private boolean running = false;
+  private static Map<Class, Class> defaultSchedulerMetricsMap =
+          new HashMap<Class, Class>();
+  static {
+    defaultSchedulerMetricsMap.put(FairScheduler.class,
+            FairSchedulerMetrics.class);
+    defaultSchedulerMetricsMap.put(FifoScheduler.class,
+            FifoSchedulerMetrics.class);
+    defaultSchedulerMetricsMap.put(CapacityScheduler.class,
+            CapacitySchedulerMetrics.class);
+  }
+  // must set by outside
+  private Set<String> queueSet;
+  private Set<String> trackedAppSet;
+
+  public final Logger LOG = Logger.getLogger(SLSCapacityScheduler.class);
+
+  public SLSCapacityScheduler() {
+    samplerLock = new ReentrantLock();
+    queueLock = new ReentrantLock();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    super.setConf(conf);
+    // start metrics
+    metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
+    if (metricsON) {
+      try {
+        initMetrics();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+
+    ShutdownHookManager.get().addShutdownHook(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          if (metricsLogBW != null)  {
+            metricsLogBW.write("]");
+            metricsLogBW.close();
+          }
+          if (web != null) {
+            web.stop();
+          }
+          tearDown();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }, SHUTDOWN_HOOK_PRIORITY);
+  }
+
+  @Override
+  public Allocation allocate(ApplicationAttemptId attemptId,
+                             List<ResourceRequest> resourceRequests,
+                             List<ContainerId> containerIds,
+                             List<String> strings, List<String> strings2) {
+    if (metricsON) {
+      final Timer.Context context = schedulerAllocateTimer.time();
+      Allocation allocation = null;
+      try {
+        allocation = super.allocate(attemptId, resourceRequests,
+                containerIds, strings, strings2);
+        return allocation;
+      } finally {
+        context.stop();
+        schedulerAllocateCounter.inc();
+        try {
+          updateQueueWithAllocateRequest(allocation, attemptId,
+                  resourceRequests, containerIds);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    } else {
+      return super.allocate(attemptId,
+              resourceRequests, containerIds, strings, strings2);
+    }
+  }
+
+  @Override
+  public void handle(SchedulerEvent schedulerEvent) {
+	    // metrics off
+	    if (! metricsON) {
+	      super.handle(schedulerEvent);
+	      return;
+	    }
+	    if(!running)    running = true;
+
+	    // metrics on
+	    Timer.Context handlerTimer = null;
+	    Timer.Context operationTimer = null;
+
+	    NodeUpdateSchedulerEventWrapper eventWrapper;
+	    try {
+	      //if (schedulerEvent instanceof NodeUpdateSchedulerEvent) {
+	      if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
+	              && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
+	        eventWrapper = new NodeUpdateSchedulerEventWrapper(
+	                (NodeUpdateSchedulerEvent)schedulerEvent);
+	        schedulerEvent = eventWrapper;
+	        updateQueueWithNodeUpdate(eventWrapper);
+	      } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
+	          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
+	        // check if having AM Container, update resource usage information
+	        AppAttemptRemovedSchedulerEvent appRemoveEvent =
+	            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
+	        ApplicationAttemptId appAttemptId =
+	                appRemoveEvent.getApplicationAttemptID();
+	        String queue = appQueueMap.get(appAttemptId);
+	        SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId);
+	        if (! app.getLiveContainers().isEmpty()) {  // have 0 or 1
+	          // should have one container which is AM container
+	          RMContainer rmc = app.getLiveContainers().iterator().next();
+	          updateQueueMetrics(queue,
+	                  rmc.getContainer().getResource().getMemory(),
+	                  rmc.getContainer().getResource().getVirtualCores());
+	        }
+	      }
+
+	      handlerTimer = schedulerHandleTimer.time();
+	      operationTimer = schedulerHandleTimerMap
+	              .get(schedulerEvent.getType()).time();
+
+	      super.handle(schedulerEvent);
+	    } finally {
+	      if (handlerTimer != null)     handlerTimer.stop();
+	      if (operationTimer != null)   operationTimer.stop();
+	      schedulerHandleCounter.inc();
+	      schedulerHandleCounterMap.get(schedulerEvent.getType()).inc();
+
+	      if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
+	          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
+	        SLSRunner.decreaseRemainingApps();
+	        AppAttemptRemovedSchedulerEvent appRemoveEvent =
+	                (AppAttemptRemovedSchedulerEvent) schedulerEvent;
+	        ApplicationAttemptId appAttemptId =
+	                appRemoveEvent.getApplicationAttemptID();
+	        appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
+	      } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_ADDED
+	          && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
+	        AppAttemptAddedSchedulerEvent appAddEvent =
+	                (AppAttemptAddedSchedulerEvent) schedulerEvent;
+	        String queueName = appAddEvent.getQueue();
+	        appQueueMap.put(appAddEvent.getApplicationAttemptId(), queueName);
+	      }
+	    }
+  }
+
+  private void updateQueueWithNodeUpdate(
+          NodeUpdateSchedulerEventWrapper eventWrapper) {
+    RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
+    List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
+    for (UpdatedContainerInfo info : containerList) {
+      for (ContainerStatus status : info.getCompletedContainers()) {
+        ContainerId containerId = status.getContainerId();
+        SchedulerAppReport app = super.getSchedulerAppInfo(
+                containerId.getApplicationAttemptId());
+
+        if (app == null) {
+          // this happens for the AM container
+          // The app have already removed when the NM sends the release
+          // information.
+          continue;
+        }
+
+        String queue = appQueueMap.get(containerId.getApplicationAttemptId());
+        int releasedMemory = 0, releasedVCores = 0;
+        if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
+          for (RMContainer rmc : app.getLiveContainers()) {
+            if (rmc.getContainerId() == containerId) {
+              releasedMemory += rmc.getContainer().getResource().getMemory();
+              releasedVCores += rmc.getContainer()
+                      .getResource().getVirtualCores();
+              break;
+            }
+          }
+        } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
+          if (preemptionContainerMap.containsKey(containerId)) {
+            Resource preResource = preemptionContainerMap.get(containerId);
+            releasedMemory += preResource.getMemory();
+            releasedVCores += preResource.getVirtualCores();
+            preemptionContainerMap.remove(containerId);
+          }
+        }
+        // update queue counters
+        updateQueueMetrics(queue, releasedMemory, releasedVCores);
+      }
+    }
+  }
+
+  private void updateQueueWithAllocateRequest(Allocation allocation,
+                        ApplicationAttemptId attemptId,
+                        List<ResourceRequest> resourceRequests,
+                        List<ContainerId> containerIds) throws IOException {
+    // update queue information
+    Resource pendingResource = Resources.createResource(0, 0);
+    Resource allocatedResource = Resources.createResource(0, 0);
+    String queueName = appQueueMap.get(attemptId);
+    // container requested
+    for (ResourceRequest request : resourceRequests) {
+      if (request.getResourceName().equals(ResourceRequest.ANY)) {
+        Resources.addTo(pendingResource,
+                Resources.multiply(request.getCapability(),
+                        request.getNumContainers()));
+      }
+    }
+    // container allocated
+    for (Container container : allocation.getContainers()) {
+      Resources.addTo(allocatedResource, container.getResource());
+      Resources.subtractFrom(pendingResource, container.getResource());
+    }
+    // container released from AM
+    SchedulerAppReport report = super.getSchedulerAppInfo(attemptId);
+    for (ContainerId containerId : containerIds) {
+      Container container = null;
+      for (RMContainer c : report.getLiveContainers()) {
+        if (c.getContainerId().equals(containerId)) {
+          container = c.getContainer();
+          break;
+        }
+      }
+      if (container != null) {
+        // released allocated containers
+        Resources.subtractFrom(allocatedResource, container.getResource());
+      } else {
+        for (RMContainer c : report.getReservedContainers()) {
+          if (c.getContainerId().equals(containerId)) {
+            container = c.getContainer();
+            break;
+          }
+        }
+        if (container != null) {
+          // released reserved containers
+          Resources.subtractFrom(pendingResource, container.getResource());
+        }
+      }
+    }
+    // containers released/preemption from scheduler
+    Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
+    if (allocation.getContainerPreemptions() != null) {
+      preemptionContainers.addAll(allocation.getContainerPreemptions());
+    }
+    if (allocation.getStrictContainerPreemptions() != null) {
+      preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
+    }
+    if (! preemptionContainers.isEmpty()) {
+      for (ContainerId containerId : preemptionContainers) {
+        if (! preemptionContainerMap.containsKey(containerId)) {
+          Container container = null;
+          for (RMContainer c : report.getLiveContainers()) {
+            if (c.getContainerId().equals(containerId)) {
+              container = c.getContainer();
+              break;
+            }
+          }
+          if (container != null) {
+            preemptionContainerMap.put(containerId, container.getResource());
+          }
+        }
+
+      }
+    }
+
+    // update metrics
+    SortedMap<String, Counter> counterMap = metrics.getCounters();
+    String names[] = new String[]{
+            "counter.queue." + queueName + ".pending.memory",
+            "counter.queue." + queueName + ".pending.cores",
+            "counter.queue." + queueName + ".allocated.memory",
+            "counter.queue." + queueName + ".allocated.cores"};
+    int values[] = new int[]{pendingResource.getMemory(),
+            pendingResource.getVirtualCores(),
+            allocatedResource.getMemory(), allocatedResource.getVirtualCores()};
+    for (int i = names.length - 1; i >= 0; i --) {
+      if (! counterMap.containsKey(names[i])) {
+        metrics.counter(names[i]);
+        counterMap = metrics.getCounters();
+      }
+      counterMap.get(names[i]).inc(values[i]);
+    }
+
+    queueLock.lock();
+    try {
+      if (! schedulerMetrics.isTracked(queueName)) {
+        schedulerMetrics.trackQueue(queueName);
+      }
+    } finally {
+      queueLock.unlock();
+    }
+  }
+
+  private void tearDown() throws IOException {
+    // close job runtime writer
+    if (jobRuntimeLogBW != null) {
+      jobRuntimeLogBW.close();
+    }
+    // shut pool
+    if (pool != null)  pool.shutdown();
+  }
+
+  @SuppressWarnings("unchecked")
+  private void initMetrics() throws Exception {
+    metrics = new MetricRegistry();
+    // configuration
+    metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR);
+    int metricsWebAddressPort = conf.getInt(
+            SLSConfiguration.METRICS_WEB_ADDRESS_PORT,
+            SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT);
+    // create SchedulerMetrics for current scheduler
+    String schedulerMetricsType = conf.get(CapacityScheduler.class.getName());
+    Class schedulerMetricsClass = schedulerMetricsType == null?
+            defaultSchedulerMetricsMap.get(CapacityScheduler.class) :
+            Class.forName(schedulerMetricsType);
+    schedulerMetrics = (SchedulerMetrics)ReflectionUtils
+            .newInstance(schedulerMetricsClass, new Configuration());
+    schedulerMetrics.init(this, metrics);
+
+    // register various metrics
+    registerJvmMetrics();
+    registerClusterResourceMetrics();
+    registerContainerAppNumMetrics();
+    registerSchedulerMetrics();
+
+    // .csv output
+    initMetricsCSVOutput();
+
+    // start web app to provide real-time tracking
+    web = new SLSWebApp(this, metricsWebAddressPort);
+    web.start();
+
+    // a thread to update histogram timer
+    pool = new ScheduledThreadPoolExecutor(2);
+    pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000,
+            TimeUnit.MILLISECONDS);
+
+    // a thread to output metrics for real-tiem tracking
+    pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000,
+            TimeUnit.MILLISECONDS);
+
+    // application running information
+    jobRuntimeLogBW = new BufferedWriter(
+            new FileWriter(metricsOutputDir + "/jobruntime.csv"));
+    jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," +
+            "simulate_start_time,simulate_end_time" + EOL);
+    jobRuntimeLogBW.flush();
+  }
+
+  private void registerJvmMetrics() {
+    // add JVM gauges
+    metrics.register("variable.jvm.free.memory",
+      new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          return Runtime.getRuntime().freeMemory();
+        }
+      }
+    );
+    metrics.register("variable.jvm.max.memory",
+      new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          return Runtime.getRuntime().maxMemory();
+        }
+      }
+    );
+    metrics.register("variable.jvm.total.memory",
+      new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          return Runtime.getRuntime().totalMemory();
+        }
+      }
+    );
+  }
+
+  private void registerClusterResourceMetrics() {
+    metrics.register("variable.cluster.allocated.memory",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          if( getRootQueueMetrics() == null) {
+            return 0;
+          } else {
+            return getRootQueueMetrics().getAllocatedMB();
+          }
+        }
+      }
+    );
+    metrics.register("variable.cluster.allocated.vcores",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          if(getRootQueueMetrics() == null) {
+            return 0;
+          } else {
+            return getRootQueueMetrics().getAllocatedVirtualCores();
+          }
+        }
+      }
+    );
+    metrics.register("variable.cluster.available.memory",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          if(getRootQueueMetrics() == null) {
+            return 0;
+          } else {
+            return getRootQueueMetrics().getAvailableMB();
+          }
+        }
+      }
+    );
+    metrics.register("variable.cluster.available.vcores",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          if(getRootQueueMetrics() == null) {
+            return 0;
+          } else {
+            return getRootQueueMetrics().getAvailableVirtualCores();
+          }
+        }
+      }
+    );
+  }
+
+  private void registerContainerAppNumMetrics() {
+    metrics.register("variable.running.application",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          if(getRootQueueMetrics() == null) {
+            return 0;
+          } else {
+            return getRootQueueMetrics().getAppsRunning();
+          }
+        }
+      }
+    );
+    metrics.register("variable.running.container",
+      new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          if(getRootQueueMetrics() == null) {
+            return 0;
+          } else {
+            return getRootQueueMetrics().getAllocatedContainers();
+          }
+        }
+      }
+    );
+  }
+
+  private void registerSchedulerMetrics() {
+    samplerLock.lock();
+    try {
+      // counters for scheduler operations
+      schedulerAllocateCounter = metrics.counter(
+              "counter.scheduler.operation.allocate");
+      schedulerHandleCounter = metrics.counter(
+              "counter.scheduler.operation.handle");
+      schedulerHandleCounterMap = new HashMap<SchedulerEventType, Counter>();
+      for (SchedulerEventType e : SchedulerEventType.values()) {
+        Counter counter = metrics.counter(
+                "counter.scheduler.operation.handle." + e);
+        schedulerHandleCounterMap.put(e, counter);
+      }
+      // timers for scheduler operations
+      int timeWindowSize = conf.getInt(
+              SLSConfiguration.METRICS_TIMER_WINDOW_SIZE,
+              SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT);
+      schedulerAllocateTimer = new Timer(
+              new SlidingWindowReservoir(timeWindowSize));
+      schedulerHandleTimer = new Timer(
+              new SlidingWindowReservoir(timeWindowSize));
+      schedulerHandleTimerMap = new HashMap<SchedulerEventType, Timer>();
+      for (SchedulerEventType e : SchedulerEventType.values()) {
+        Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize));
+        schedulerHandleTimerMap.put(e, timer);
+      }
+      // histogram for scheduler operations (Samplers)
+      schedulerHistogramList = new ArrayList<Histogram>();
+      histogramTimerMap = new HashMap<Histogram, Timer>();
+      Histogram schedulerAllocateHistogram = new Histogram(
+              new SlidingWindowReservoir(SAMPLING_SIZE));
+      metrics.register("sampler.scheduler.operation.allocate.timecost",
+              schedulerAllocateHistogram);
+      schedulerHistogramList.add(schedulerAllocateHistogram);
+      histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer);
+      Histogram schedulerHandleHistogram = new Histogram(
+              new SlidingWindowReservoir(SAMPLING_SIZE));
+      metrics.register("sampler.scheduler.operation.handle.timecost",
+              schedulerHandleHistogram);
+      schedulerHistogramList.add(schedulerHandleHistogram);
+      histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer);
+      for (SchedulerEventType e : SchedulerEventType.values()) {
+        Histogram histogram = new Histogram(
+                new SlidingWindowReservoir(SAMPLING_SIZE));
+        metrics.register(
+                "sampler.scheduler.operation.handle." + e + ".timecost",
+                histogram);
+        schedulerHistogramList.add(histogram);
+        histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e));
+      }
+    } finally {
+      samplerLock.unlock();
+    }
+  }
+
+  private void initMetricsCSVOutput() {
+    int timeIntervalMS = conf.getInt(
+            SLSConfiguration.METRICS_RECORD_INTERVAL_MS,
+            SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT);
+    File dir = new File(metricsOutputDir + "/metrics");
+    if(! dir.exists()
+            && ! dir.mkdirs()) {
+      LOG.error("Cannot create directory " + dir.getAbsoluteFile());
+    }
+    final CsvReporter reporter = CsvReporter.forRegistry(metrics)
+            .formatFor(Locale.US)
+            .convertRatesTo(TimeUnit.SECONDS)
+            .convertDurationsTo(TimeUnit.MILLISECONDS)
+            .build(new File(metricsOutputDir + "/metrics"));
+    reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS);
+  }
+
+  class HistogramsRunnable implements Runnable {
+    @Override
+    public void run() {
+      samplerLock.lock();
+      try {
+        for (Histogram histogram : schedulerHistogramList) {
+          Timer timer = histogramTimerMap.get(histogram);
+          histogram.update((int) timer.getSnapshot().getMean());
+        }
+      } finally {
+        samplerLock.unlock();
+      }
+    }
+  }
+
+  class MetricsLogRunnable implements Runnable {
+    private boolean firstLine = true;
+    public MetricsLogRunnable() {
+      try {
+        metricsLogBW = new BufferedWriter(
+                new FileWriter(metricsOutputDir + "/realtimetrack.json"));
+        metricsLogBW.write("[");
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void run() {
+      if(running) {
+        // all WebApp to get real tracking json
+        String metrics = web.generateRealTimeTrackingMetrics();
+        // output
+        try {
+          if(firstLine) {
+            metricsLogBW.write(metrics + EOL);
+            firstLine = false;
+          } else {
+            metricsLogBW.write("," + metrics + EOL);
+          }
+          metricsLogBW.flush();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+
+  // the following functions are used by AMSimulator
+  public void addAMRuntime(ApplicationId appId,
+                           long traceStartTimeMS, long traceEndTimeMS,
+                           long simulateStartTimeMS, long simulateEndTimeMS) {
+
+    try {
+      // write job runtime information
+      StringBuilder sb = new StringBuilder();
+      sb.append(appId).append(",").append(traceStartTimeMS).append(",")
+              .append(traceEndTimeMS).append(",").append(simulateStartTimeMS)
+              .append(",").append(simulateEndTimeMS);
+      jobRuntimeLogBW.write(sb.toString() + EOL);
+      jobRuntimeLogBW.flush();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void updateQueueMetrics(String queue,
+                                  int releasedMemory, int releasedVCores) {
+    // update queue counters
+    SortedMap<String, Counter> counterMap = metrics.getCounters();
+    if (releasedMemory != 0) {
+      String name = "counter.queue." + queue + ".allocated.memory";
+      if (! counterMap.containsKey(name)) {
+        metrics.counter(name);
+        counterMap = metrics.getCounters();
+      }
+      counterMap.get(name).inc(-releasedMemory);
+    }
+    if (releasedVCores != 0) {
+      String name = "counter.queue." + queue + ".allocated.cores";
+      if (! counterMap.containsKey(name)) {
+        metrics.counter(name);
+        counterMap = metrics.getCounters();
+      }
+      counterMap.get(name).inc(-releasedVCores);
+    }
+  }
+
+  public void setQueueSet(Set<String> queues) {
+    this.queueSet = queues;
+  }
+
+  public Set<String> getQueueSet() {
+    return this.queueSet;
+  }
+
+  public void setTrackedAppSet(Set<String> apps) {
+    this.trackedAppSet = apps;
+  }
+
+  public Set<String> getTrackedAppSet() {
+    return this.trackedAppSet;
+  }
+
+  public MetricRegistry getMetrics() {
+    return metrics;
+  }
+
+  public SchedulerMetrics getSchedulerMetrics() {
+    return schedulerMetrics;
+  }
+
+  // API open to out classes
+  public void addTrackedApp(ApplicationAttemptId appAttemptId,
+                            String oldAppId) {
+    if (metricsON) {
+      schedulerMetrics.trackApp(appAttemptId, oldAppId);
+    }
+  }
+
+  public void removeTrackedApp(ApplicationAttemptId appAttemptId,
+                               String oldAppId) {
+    if (metricsON) {
+      schedulerMetrics.untrackApp(appAttemptId, oldAppId);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+
+
+
+}
+

Propchange: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java?rev=1552173&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java Thu Dec 19 00:07:59 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import com.codahale.metrics.MetricRegistry;
+
+public interface SchedulerWrapper {
+
+	public MetricRegistry getMetrics();
+	public SchedulerMetrics getSchedulerMetrics();
+	public Set<String> getQueueSet();
+	public void setQueueSet(Set<String> queues);
+	public Set<String> getTrackedAppSet();
+	public void setTrackedAppSet(Set<String> apps);
+	public void addTrackedApp(ApplicationAttemptId appAttemptId,
+              String oldAppId);
+	public void removeTrackedApp(ApplicationAttemptId appAttemptId,
+                 String oldAppId);
+	public void addAMRuntime(ApplicationId appId,
+              long traceStartTimeMS, long traceEndTimeMS,
+              long simulateStartTimeMS, long simulateEndTimeMS);
+
+}

Propchange: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java?rev=1552173&r1=1552172&r2=1552173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java (original)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java Thu Dec 19 00:07:59 2013
@@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.sls.SLSRun
 import org.apache.hadoop.yarn.sls.scheduler.FairSchedulerMetrics;
 import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper;
 import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
+import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
+
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
@@ -50,7 +52,7 @@ import org.mortbay.jetty.handler.Resourc
 public class SLSWebApp extends HttpServlet {
   private static final long serialVersionUID = 1905162041950251407L;
   private transient Server server;
-  private transient ResourceSchedulerWrapper wrapper;
+  private transient SchedulerWrapper wrapper;
   private transient MetricRegistry metrics;
   private transient SchedulerMetrics schedulerMetrics;
   // metrics objects
@@ -90,7 +92,7 @@ public class SLSWebApp extends HttpServl
     }
   }
 
-  public SLSWebApp(ResourceSchedulerWrapper wrapper, int metricsAddressPort) {
+  public SLSWebApp(SchedulerWrapper wrapper, int metricsAddressPort) {
     this.wrapper = wrapper;
     metrics = wrapper.getMetrics();
     handleOperTimecostHistogramMap =

Added: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml?rev=1552173&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml (added)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml Thu Dec 19 00:07:59 2013
@@ -0,0 +1,60 @@
+<?xml version="1.0"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<!--
+  This file contains queue allocations for the Capacity Scheduler.
+  Its format is explained in the Capacity Scheduler documentation at
+  http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html.
+  The documentation also includes a sample config file.
+-->
+
+<configuration>
+  <property>
+    <name>yarn.scheduler.capacity.root.queues</name>
+    <value>sls_queue_1,sls_queue_2,sls_queue_3</value>
+    <description>The queues at the this level (root is the root queue).
+    </description>
+  </property>
+
+  <property>
+    <name>yarn.scheduler.capacity.root.sls_queue_1.capacity</name>
+    <value>25</value>
+  </property>
+
+  <property>
+    <name>yarn.scheduler.capacity.root.sls_queue_1.maximum-capacity</name>
+    <value>100</value>
+  </property>
+
+  <property>
+    <name>yarn.scheduler.capacity.root.sls_queue_2.capacity</name>
+    <value>25</value>
+  </property>
+
+  <property>
+    <name>yarn.scheduler.capacity.root.sls_queue_2.maximum-capacity</name>
+    <value>100</value>
+  </property>
+
+  <property>
+    <name>yarn.scheduler.capacity.root.sls_queue_3.capacity</name>
+    <value>50</value>
+  </property>
+
+  <property>
+    <name>yarn.scheduler.capacity.root.sls_queue_3.maximum-capacity</name>
+    <value>100</value>
+  </property>
+</configuration>

Propchange: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/test/resources/capacity-scheduler.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/trunk/hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml?rev=1552173&r1=1552172&r2=1552173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml (original)
+++ hadoop/common/trunk/hadoop-tools/hadoop-sls/src/test/resources/yarn-site.xml Thu Dec 19 00:07:59 2013
@@ -17,7 +17,18 @@
 <configuration>
   <property>
     <name>yarn.resourcemanager.scheduler.class</name>
-    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
+	  <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
+    <!-- <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value> -->
+  </property>
+
+  <property>
+    <name>yarn.resourcemanager.scheduler.monitor.enable</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>yarn.resourcemanager.scheduler.monitor.policies</name>
+    <value>org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy</value>
   </property>
 
   <property>