You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by qu...@apache.org on 2022/03/24 15:25:32 UTC

[hadoop] branch trunk updated: YARN-10552. Eliminate code duplication in SLSCapacityScheduler and SLSFairScheduler. Contributed by Szilard Nemeth.

This is an automated email from the ASF dual-hosted git repository.

quapaw pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5261424  YARN-10552. Eliminate code duplication in SLSCapacityScheduler and SLSFairScheduler. Contributed by Szilard Nemeth.
5261424 is described below

commit 526142447abdee02b86820d884b577b23b769663
Author: 9uapaw <gy...@gmail.com>
AuthorDate: Thu Mar 24 16:24:33 2022 +0100

    YARN-10552. Eliminate code duplication in SLSCapacityScheduler and SLSFairScheduler. Contributed by Szilard Nemeth.
---
 .../yarn/sls/scheduler/SLSCapacityScheduler.java   | 328 ++-------------------
 .../yarn/sls/scheduler/SLSFairScheduler.java       | 291 +-----------------
 ...FairScheduler.java => SLSSchedulerCommons.java} | 295 +++++++++---------
 3 files changed, 188 insertions(+), 726 deletions(-)

diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
index e0cb151..b6fe5c0 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
@@ -17,12 +17,7 @@
  */
 package org.apache.hadoop.yarn.sls.scheduler;
 
-import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -30,119 +25,51 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
-import org.apache.hadoop.yarn.sls.SLSRunner;
-import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.Timer;
 
 @Private
 @Unstable
 public class SLSCapacityScheduler extends CapacityScheduler implements
         SchedulerWrapper,Configurable {
-  private Configuration conf;
- 
-  private Map<ApplicationAttemptId, String> appQueueMap =
-          new ConcurrentHashMap<ApplicationAttemptId, String>();
-
-  private Map<ContainerId, Resource> preemptionContainerMap =
-          new ConcurrentHashMap<ContainerId, Resource>();
-
-  // metrics
-  private SchedulerMetrics schedulerMetrics;
-  private boolean metricsON;
-  private Tracker tracker;
-
-  // logger
-  private static final Logger LOG = LoggerFactory.getLogger(SLSCapacityScheduler.class);
 
-  public Tracker getTracker() {
-    return tracker;
-  }
+  private final SLSSchedulerCommons schedulerCommons;
+  private Configuration conf;
 
   public SLSCapacityScheduler() {
-    tracker = new Tracker();
+    schedulerCommons = new SLSSchedulerCommons(this);
   }
 
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
     super.setConf(conf);
-    metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
-    if (metricsON) {
-      try {
-        schedulerMetrics = SchedulerMetrics.getInstance(conf,
-            CapacityScheduler.class);
-        schedulerMetrics.init(this, conf);
-      } catch (Exception e) {
-        LOG.error("Caught exception while initializing schedulerMetrics", e);
-      }
-    }
+    schedulerCommons.initMetrics(CapacityScheduler.class, conf);
   }
 
   @Override
   public Allocation allocate(ApplicationAttemptId attemptId,
       List<ResourceRequest> resourceRequests,
       List<SchedulingRequest> schedulingRequests, List<ContainerId> containerIds,
-      List<String> strings, List<String> strings2, ContainerUpdates updateRequests) {
-    if (metricsON) {
-      final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer()
-          .time();
-      Allocation allocation = null;
-      try {
-        allocation = super
-            .allocate(attemptId, resourceRequests, schedulingRequests,
-                containerIds, strings,
-                strings2, updateRequests);
-        return allocation;
-      } catch (Exception e) {
-        LOG.error("Caught exception from allocate", e);
-        throw e;
-      } finally {
-        context.stop();
-        schedulerMetrics.increaseSchedulerAllocationCounter();
-        try {
-          updateQueueWithAllocateRequest(allocation, attemptId,
-                  resourceRequests, containerIds);
-        } catch (IOException e) {
-          LOG.error("Caught exception while executing finally block", e);
-        }
-      }
-    } else {
-      return super.allocate(attemptId, resourceRequests, schedulingRequests,
-          containerIds, strings,
-          strings2, updateRequests);
-    }
+      List<String> blacklistAdditions, List<String> blacklistRemovals,
+      ContainerUpdates updateRequests) {
+    return schedulerCommons.allocate(attemptId, resourceRequests, schedulingRequests,
+        containerIds, blacklistAdditions, blacklistRemovals, updateRequests);
   }
 
 
   @Override
   public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
       boolean updatePending) {
-    if (metricsON) {
+    if (schedulerCommons.isMetricsON()) {
       boolean isSuccess = false;
       long startTimeNs = System.nanoTime();
       try {
@@ -151,13 +78,13 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
       } finally {
         long elapsedNs = System.nanoTime() - startTimeNs;
         if (isSuccess) {
-          schedulerMetrics.getSchedulerCommitSuccessTimer()
+          getSchedulerMetrics().getSchedulerCommitSuccessTimer()
               .update(elapsedNs, TimeUnit.NANOSECONDS);
-          schedulerMetrics.increaseSchedulerCommitSuccessCounter();
+          getSchedulerMetrics().increaseSchedulerCommitSuccessCounter();
         } else {
-          schedulerMetrics.getSchedulerCommitFailureTimer()
+          getSchedulerMetrics().getSchedulerCommitFailureTimer()
               .update(elapsedNs, TimeUnit.NANOSECONDS);
-          schedulerMetrics.increaseSchedulerCommitFailureCounter();
+          getSchedulerMetrics().increaseSchedulerCommitFailureCounter();
         }
       }
     } else {
@@ -167,222 +94,26 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
 
   @Override
   public void handle(SchedulerEvent schedulerEvent) {
-    if (!metricsON) {
-      super.handle(schedulerEvent);
-      return;
-    }
-
-    if (!schedulerMetrics.isRunning()) {
-      schedulerMetrics.setRunning(true);
-    }
-
-    Timer.Context handlerTimer = null;
-    Timer.Context operationTimer = null;
-
-    NodeUpdateSchedulerEventWrapper eventWrapper;
-    try {
-      if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
-          && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
-        eventWrapper = new NodeUpdateSchedulerEventWrapper(
-            (NodeUpdateSchedulerEvent)schedulerEvent);
-        schedulerEvent = eventWrapper;
-        updateQueueWithNodeUpdate(eventWrapper);
-      } else if (schedulerEvent.getType() ==
-          SchedulerEventType.APP_ATTEMPT_REMOVED
-          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
-        // check if having AM Container, update resource usage information
-        AppAttemptRemovedSchedulerEvent appRemoveEvent =
-            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
-        ApplicationAttemptId appAttemptId =
-            appRemoveEvent.getApplicationAttemptID();
-        String queue = appQueueMap.get(appAttemptId);
-        SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId);
-        if (!app.getLiveContainers().isEmpty()) {  // have 0 or 1
-          // should have one container which is AM container
-          RMContainer rmc = app.getLiveContainers().iterator().next();
-          schedulerMetrics.updateQueueMetricsByRelease(
-              rmc.getContainer().getResource(), queue);
-        }
-      }
-
-      handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time();
-      operationTimer = schedulerMetrics.getSchedulerHandleTimer(
-          schedulerEvent.getType()).time();
-
-      super.handle(schedulerEvent);
-    } finally {
-      if (handlerTimer != null) {
-        handlerTimer.stop();
-      }
-      if (operationTimer != null) {
-        operationTimer.stop();
-      }
-      schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType());
-
-      if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
-          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
-        SLSRunner.decreaseRemainingApps();
-        AppAttemptRemovedSchedulerEvent appRemoveEvent =
-            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
-        appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
-        if (SLSRunner.getRemainingApps() == 0) {
-          try {
-            getSchedulerMetrics().tearDown();
-            SLSRunner.exitSLSRunner();
-          } catch (Exception e) {
-            LOG.error("Scheduler Metrics failed to tear down.", e);
-          }
-        }
-      } else if (schedulerEvent.getType() ==
-          SchedulerEventType.APP_ATTEMPT_ADDED
-          && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
-        AppAttemptAddedSchedulerEvent appAddEvent =
-            (AppAttemptAddedSchedulerEvent) schedulerEvent;
-        SchedulerApplication app =
-            applications.get(appAddEvent.getApplicationAttemptId()
-                .getApplicationId());
-        appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
-            .getQueueName());
-      }
-    }
-  }
-
-  private void updateQueueWithNodeUpdate(
-          NodeUpdateSchedulerEventWrapper eventWrapper) {
-    RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
-    List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
-    for (UpdatedContainerInfo info : containerList) {
-      for (ContainerStatus status : info.getCompletedContainers()) {
-        ContainerId containerId = status.getContainerId();
-        SchedulerAppReport app = super.getSchedulerAppInfo(
-                containerId.getApplicationAttemptId());
-
-        if (app == null) {
-          // this happens for the AM container
-          // The app have already removed when the NM sends the release
-          // information.
-          continue;
-        }
-
-        String queue = appQueueMap.get(containerId.getApplicationAttemptId());
-        int releasedMemory = 0, releasedVCores = 0;
-        if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
-          for (RMContainer rmc : app.getLiveContainers()) {
-            if (rmc.getContainerId() == containerId) {
-              releasedMemory += rmc.getContainer().getResource().getMemorySize();
-              releasedVCores += rmc.getContainer()
-                      .getResource().getVirtualCores();
-              break;
-            }
-          }
-        } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
-          if (preemptionContainerMap.containsKey(containerId)) {
-            Resource preResource = preemptionContainerMap.get(containerId);
-            releasedMemory += preResource.getMemorySize();
-            releasedVCores += preResource.getVirtualCores();
-            preemptionContainerMap.remove(containerId);
-          }
-        }
-        // update queue counters
-        schedulerMetrics.updateQueueMetricsByRelease(
-            Resource.newInstance(releasedMemory, releasedVCores), queue);
-      }
-    }
-  }
-
-  private void updateQueueWithAllocateRequest(Allocation allocation,
-                        ApplicationAttemptId attemptId,
-                        List<ResourceRequest> resourceRequests,
-                        List<ContainerId> containerIds) throws IOException {
-    // update queue information
-    Resource pendingResource = Resources.createResource(0, 0);
-    Resource allocatedResource = Resources.createResource(0, 0);
-    String queueName = appQueueMap.get(attemptId);
-    // container requested
-    for (ResourceRequest request : resourceRequests) {
-      if (request.getResourceName().equals(ResourceRequest.ANY)) {
-        Resources.addTo(pendingResource,
-                Resources.multiply(request.getCapability(),
-                        request.getNumContainers()));
-      }
-    }
-    // container allocated
-    for (Container container : allocation.getContainers()) {
-      Resources.addTo(allocatedResource, container.getResource());
-      Resources.subtractFrom(pendingResource, container.getResource());
-    }
-    // container released from AM
-    SchedulerAppReport report = super.getSchedulerAppInfo(attemptId);
-    for (ContainerId containerId : containerIds) {
-      Container container = null;
-      for (RMContainer c : report.getLiveContainers()) {
-        if (c.getContainerId().equals(containerId)) {
-          container = c.getContainer();
-          break;
-        }
-      }
-      if (container != null) {
-        // released allocated containers
-        Resources.subtractFrom(allocatedResource, container.getResource());
-      } else {
-        for (RMContainer c : report.getReservedContainers()) {
-          if (c.getContainerId().equals(containerId)) {
-            container = c.getContainer();
-            break;
-          }
-        }
-        if (container != null) {
-          // released reserved containers
-          Resources.subtractFrom(pendingResource, container.getResource());
-        }
-      }
-    }
-    // containers released/preemption from scheduler
-    Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
-    if (allocation.getContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getContainerPreemptions());
-    }
-    if (allocation.getStrictContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
-    }
-    if (! preemptionContainers.isEmpty()) {
-      for (ContainerId containerId : preemptionContainers) {
-        if (! preemptionContainerMap.containsKey(containerId)) {
-          Container container = null;
-          for (RMContainer c : report.getLiveContainers()) {
-            if (c.getContainerId().equals(containerId)) {
-              container = c.getContainer();
-              break;
-            }
-          }
-          if (container != null) {
-            preemptionContainerMap.put(containerId, container.getResource());
-          }
-        }
-
-      }
-    }
-
-    // update metrics
-    schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
-        queueName);
+    schedulerCommons.handle(schedulerEvent);
   }
 
   @Override
   public void serviceStop() throws Exception {
-    try {
-      if (metricsON) {
-        schedulerMetrics.tearDown();
-      }
-    } catch (Exception e) {
-      LOG.error("Caught exception while stopping service", e);
-    }
+    schedulerCommons.stopMetrics();
     super.serviceStop();
   }
 
 
+  public String getRealQueueName(String queue) throws YarnException {
+    if (getQueue(queue) == null) {
+      throw new YarnException("Can't find the queue by the given name: " + queue
+          + "! Please check if queue " + queue + " is in the allocation file.");
+    }
+    return getQueue(queue).getQueuePath();
+  }
+
   public SchedulerMetrics getSchedulerMetrics() {
-    return schedulerMetrics;
+    return schedulerCommons.getSchedulerMetrics();
   }
 
   @Override
@@ -390,11 +121,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
     return conf;
   }
 
-  public String getRealQueueName(String queue) throws YarnException {
-    if (getQueue(queue) == null) {
-      throw new YarnException("Can't find the queue by the given name: " + queue
-          + "! Please check if queue " + queue + " is in the allocation file.");
-    }
-    return getQueue(queue).getQueuePath();
+  public Tracker getTracker() {
+    return schedulerCommons.getTracker();
   }
-}
\ No newline at end of file
+}
+
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
index 84549bc..b164316 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
@@ -17,84 +17,35 @@
  */
 package org.apache.hadoop.yarn.sls.scheduler;
 
-import com.codahale.metrics.Timer;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.sls.SLSRunner;
-import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 @Private
 @Unstable
 public class SLSFairScheduler extends FairScheduler
     implements SchedulerWrapper, Configurable {
-  private SchedulerMetrics schedulerMetrics;
-  private boolean metricsON;
-  private Tracker tracker;
-
-  private Map<ContainerId, Resource> preemptionContainerMap =
-      new ConcurrentHashMap<>();
-
-  // logger
-  private static final Logger LOG =
-      LoggerFactory.getLogger(SLSFairScheduler.class);
-
-  public SchedulerMetrics getSchedulerMetrics() {
-    return schedulerMetrics;
-  }
-
-  public Tracker getTracker() {
-    return tracker;
-  }
+  private final SLSSchedulerCommons schedulerCommons;
 
   public SLSFairScheduler() {
-    tracker = new Tracker();
+    schedulerCommons = new SLSSchedulerCommons(this);
   }
 
   @Override
   public void setConf(Configuration conf) {
     super.setConfig(conf);
-
-    metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
-    if (metricsON) {
-      try {
-        schedulerMetrics = SchedulerMetrics.getInstance(conf,
-            FairScheduler.class);
-        schedulerMetrics.init(this, conf);
-      } catch (Exception e) {
-        LOG.error("Caught exception while initializing schedulerMetrics", e);
-      }
-    }
+    schedulerCommons.initMetrics(FairScheduler.class, conf);
   }
 
   @Override
@@ -103,237 +54,18 @@ public class SLSFairScheduler extends FairScheduler
       List<SchedulingRequest> schedulingRequests, List<ContainerId> containerIds,
       List<String> blacklistAdditions, List<String> blacklistRemovals,
       ContainerUpdates updateRequests) {
-    if (metricsON) {
-      final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer()
-          .time();
-      Allocation allocation = null;
-      try {
-        allocation = super.allocate(attemptId, resourceRequests,
-            schedulingRequests, containerIds,
-            blacklistAdditions, blacklistRemovals, updateRequests);
-        return allocation;
-      } catch (Exception e) {
-        LOG.error("Caught exception from allocate", e);
-        throw e;
-      } finally {
-        context.stop();
-        schedulerMetrics.increaseSchedulerAllocationCounter();
-        try {
-          updateQueueWithAllocateRequest(allocation, attemptId,
-              resourceRequests, containerIds);
-        } catch (IOException e) {
-          LOG.error("Caught exception while executing finally block", e);
-        }
-      }
-    } else {
-      return super.allocate(attemptId, resourceRequests, schedulingRequests,
-          containerIds,
-          blacklistAdditions, blacklistRemovals, updateRequests);
-    }
+    return schedulerCommons.allocate(attemptId, resourceRequests, schedulingRequests,
+        containerIds, blacklistAdditions, blacklistRemovals, updateRequests);
   }
 
   @Override
   public void handle(SchedulerEvent schedulerEvent) {
-    // metrics off
-    if (!metricsON) {
-      super.handle(schedulerEvent);
-      return;
-    }
-
-    // metrics on
-    if(!schedulerMetrics.isRunning()) {
-      schedulerMetrics.setRunning(true);
-    }
-
-    Timer.Context handlerTimer = null;
-    Timer.Context operationTimer = null;
-
-    NodeUpdateSchedulerEventWrapper eventWrapper;
-    try {
-      if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
-          && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
-        eventWrapper = new NodeUpdateSchedulerEventWrapper(
-            (NodeUpdateSchedulerEvent)schedulerEvent);
-        schedulerEvent = eventWrapper;
-        updateQueueWithNodeUpdate(eventWrapper);
-      } else if (
-          schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
-          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
-        // check if having AM Container, update resource usage information
-        AppAttemptRemovedSchedulerEvent appRemoveEvent =
-            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
-        ApplicationAttemptId appAttemptId =
-            appRemoveEvent.getApplicationAttemptID();
-        String queueName = getSchedulerApp(appAttemptId).getQueue().getName();
-        SchedulerAppReport app = getSchedulerAppInfo(appAttemptId);
-        if (!app.getLiveContainers().isEmpty()) {  // have 0 or 1
-          // should have one container which is AM container
-          RMContainer rmc = app.getLiveContainers().iterator().next();
-          schedulerMetrics.updateQueueMetricsByRelease(
-              rmc.getContainer().getResource(), queueName);
-        }
-      }
-
-      handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time();
-      operationTimer = schedulerMetrics.getSchedulerHandleTimer(
-          schedulerEvent.getType()).time();
-
-      super.handle(schedulerEvent);
-    } finally {
-      if (handlerTimer != null) {
-        handlerTimer.stop();
-      }
-      if (operationTimer != null) {
-        operationTimer.stop();
-      }
-      schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType());
-
-      if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
-          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
-        SLSRunner.decreaseRemainingApps();
-        if (SLSRunner.getRemainingApps() == 0) {
-          try {
-            getSchedulerMetrics().tearDown();
-            SLSRunner.exitSLSRunner();
-          } catch (Exception e) {
-            LOG.error("Scheduler Metrics failed to tear down.", e);
-          }
-        }
-      }
-    }
-  }
-
-  private void updateQueueWithNodeUpdate(
-      NodeUpdateSchedulerEventWrapper eventWrapper) {
-    RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
-    List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
-    for (UpdatedContainerInfo info : containerList) {
-      for (ContainerStatus status : info.getCompletedContainers()) {
-        ContainerId containerId = status.getContainerId();
-        SchedulerAppReport app = super.getSchedulerAppInfo(
-            containerId.getApplicationAttemptId());
-
-        if (app == null) {
-          // this happens for the AM container
-          // The app have already removed when the NM sends the release
-          // information.
-          continue;
-        }
-
-        int releasedMemory = 0, releasedVCores = 0;
-        if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
-          for (RMContainer rmc : app.getLiveContainers()) {
-            if (rmc.getContainerId() == containerId) {
-              Resource resource = rmc.getContainer().getResource();
-              releasedMemory += resource.getMemorySize();
-              releasedVCores += resource.getVirtualCores();
-              break;
-            }
-          }
-        } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
-          if (preemptionContainerMap.containsKey(containerId)) {
-            Resource preResource = preemptionContainerMap.get(containerId);
-            releasedMemory += preResource.getMemorySize();
-            releasedVCores += preResource.getVirtualCores();
-            preemptionContainerMap.remove(containerId);
-          }
-        }
-        // update queue counters
-        String queue = getSchedulerApp(containerId.getApplicationAttemptId()).
-            getQueueName();
-        schedulerMetrics.updateQueueMetricsByRelease(
-            Resource.newInstance(releasedMemory, releasedVCores), queue);
-      }
-    }
-  }
-
-  private void updateQueueWithAllocateRequest(Allocation allocation,
-      ApplicationAttemptId attemptId,
-      List<ResourceRequest> resourceRequests,
-      List<ContainerId> containerIds) throws IOException {
-    // update queue information
-    Resource pendingResource = Resources.createResource(0, 0);
-    Resource allocatedResource = Resources.createResource(0, 0);
-    // container requested
-    for (ResourceRequest request : resourceRequests) {
-      if (request.getResourceName().equals(ResourceRequest.ANY)) {
-        Resources.addTo(pendingResource,
-            Resources.multiply(request.getCapability(),
-                request.getNumContainers()));
-      }
-    }
-    // container allocated
-    for (Container container : allocation.getContainers()) {
-      Resources.addTo(allocatedResource, container.getResource());
-      Resources.subtractFrom(pendingResource, container.getResource());
-    }
-    // container released from AM
-    SchedulerAppReport report = super.getSchedulerAppInfo(attemptId);
-    for (ContainerId containerId : containerIds) {
-      Container container = null;
-      for (RMContainer c : report.getLiveContainers()) {
-        if (c.getContainerId().equals(containerId)) {
-          container = c.getContainer();
-          break;
-        }
-      }
-      if (container != null) {
-        // released allocated containers
-        Resources.subtractFrom(allocatedResource, container.getResource());
-      } else {
-        for (RMContainer c : report.getReservedContainers()) {
-          if (c.getContainerId().equals(containerId)) {
-            container = c.getContainer();
-            break;
-          }
-        }
-        if (container != null) {
-          // released reserved containers
-          Resources.subtractFrom(pendingResource, container.getResource());
-        }
-      }
-    }
-    // containers released/preemption from scheduler
-    Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
-    if (allocation.getContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getContainerPreemptions());
-    }
-    if (allocation.getStrictContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
-    }
-    if (!preemptionContainers.isEmpty()) {
-      for (ContainerId containerId : preemptionContainers) {
-        if (!preemptionContainerMap.containsKey(containerId)) {
-          Container container = null;
-          for (RMContainer c : report.getLiveContainers()) {
-            if (c.getContainerId().equals(containerId)) {
-              container = c.getContainer();
-              break;
-            }
-          }
-          if (container != null) {
-            preemptionContainerMap.put(containerId, container.getResource());
-          }
-        }
-
-      }
-    }
-
-    // update metrics
-    String queueName = getSchedulerApp(attemptId).getQueueName();
-    schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
-        queueName);
+    schedulerCommons.handle(schedulerEvent);
   }
 
   @Override
   public void serviceStop() throws Exception {
-    try {
-      if (metricsON) {
-        schedulerMetrics.tearDown();
-      }
-    } catch (Exception e) {
-      LOG.error("Caught exception while stopping service", e);
-    }
+    schedulerCommons.stopMetrics();
     super.serviceStop();
   }
 
@@ -344,5 +76,12 @@ public class SLSFairScheduler extends FairScheduler
     }
     return getQueueManager().getQueue(queue).getQueueName();
   }
-}
 
+  public SchedulerMetrics getSchedulerMetrics() {
+    return schedulerCommons.getSchedulerMetrics();
+  }
+
+  public Tracker getTracker() {
+    return schedulerCommons.getTracker();
+  }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java
similarity index 79%
copy from hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
copy to hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java
index 84549bc..92aa960 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,21 +6,17 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.yarn.sls.scheduler;
 
 import com.codahale.metrics.Timer;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -30,17 +26,18 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -54,61 +51,59 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-@Private
-@Unstable
-public class SLSFairScheduler extends FairScheduler
-    implements SchedulerWrapper, Configurable {
-  private SchedulerMetrics schedulerMetrics;
-  private boolean metricsON;
-  private Tracker tracker;
+public class SLSSchedulerCommons {
+  private static final Logger LOG = LoggerFactory.getLogger(SLSSchedulerCommons.class);
 
+  private AbstractYarnScheduler scheduler;
+  private boolean metricsON;
+  private SchedulerMetrics schedulerMetrics;
   private Map<ContainerId, Resource> preemptionContainerMap =
       new ConcurrentHashMap<>();
 
-  // logger
-  private static final Logger LOG =
-      LoggerFactory.getLogger(SLSFairScheduler.class);
-
-  public SchedulerMetrics getSchedulerMetrics() {
-    return schedulerMetrics;
-  }
-
-  public Tracker getTracker() {
-    return tracker;
-  }
-
-  public SLSFairScheduler() {
-    tracker = new Tracker();
+  private Map<ApplicationAttemptId, String> appQueueMap =
+      new ConcurrentHashMap<>();
+  private Tracker tracker;
+  
+  public SLSSchedulerCommons(AbstractYarnScheduler scheduler) {
+    this.scheduler = scheduler;
+    this.tracker = new Tracker();
   }
 
-  @Override
-  public void setConf(Configuration conf) {
-    super.setConfig(conf);
-
+  public void initMetrics(Class<? extends AbstractYarnScheduler> schedulerClass, Configuration conf) {
     metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
     if (metricsON) {
       try {
-        schedulerMetrics = SchedulerMetrics.getInstance(conf,
-            FairScheduler.class);
-        schedulerMetrics.init(this, conf);
+        schedulerMetrics = SchedulerMetrics.getInstance(conf, schedulerClass);
+        schedulerMetrics.init(scheduler, conf);
       } catch (Exception e) {
         LOG.error("Caught exception while initializing schedulerMetrics", e);
       }
     }
   }
 
-  @Override
+  void stopMetrics() {
+    try {
+      if (metricsON) {
+        schedulerMetrics.tearDown();
+      }
+    } catch (Exception e) {
+      LOG.error("Caught exception while stopping service", e);
+    }
+  }
+  
   public Allocation allocate(ApplicationAttemptId attemptId,
       List<ResourceRequest> resourceRequests,
-      List<SchedulingRequest> schedulingRequests, List<ContainerId> containerIds,
-      List<String> blacklistAdditions, List<String> blacklistRemovals,
+      List<SchedulingRequest> schedulingRequests,
+      List<ContainerId> containerIds,
+      List<String> blacklistAdditions,
+      List<String> blacklistRemovals,
       ContainerUpdates updateRequests) {
     if (metricsON) {
       final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer()
           .time();
       Allocation allocation = null;
       try {
-        allocation = super.allocate(attemptId, resourceRequests,
+        allocation = scheduler.allocate(attemptId, resourceRequests,
             schedulingRequests, containerIds,
             blacklistAdditions, blacklistRemovals, updateRequests);
         return allocation;
@@ -126,22 +121,97 @@ public class SLSFairScheduler extends FairScheduler
         }
       }
     } else {
-      return super.allocate(attemptId, resourceRequests, schedulingRequests,
+      return scheduler.allocate(attemptId, resourceRequests, schedulingRequests,
           containerIds,
           blacklistAdditions, blacklistRemovals, updateRequests);
     }
   }
 
-  @Override
+  private void updateQueueWithAllocateRequest(Allocation allocation,
+      ApplicationAttemptId attemptId,
+      List<ResourceRequest> resourceRequests,
+      List<ContainerId> containerIds) throws IOException {
+    // update queue information
+    Resource pendingResource = Resources.createResource(0, 0);
+    Resource allocatedResource = Resources.createResource(0, 0);
+    String queueName = appQueueMap.get(attemptId);
+    // container requested
+    for (ResourceRequest request : resourceRequests) {
+      if (request.getResourceName().equals(ResourceRequest.ANY)) {
+        Resources.addTo(pendingResource,
+            Resources.multiply(request.getCapability(),
+                request.getNumContainers()));
+      }
+    }
+    // container allocated
+    for (Container container : allocation.getContainers()) {
+      Resources.addTo(allocatedResource, container.getResource());
+      Resources.subtractFrom(pendingResource, container.getResource());
+    }
+    // container released from AM
+    SchedulerAppReport report = scheduler.getSchedulerAppInfo(attemptId);
+    for (ContainerId containerId : containerIds) {
+      Container container = null;
+      for (RMContainer c : report.getLiveContainers()) {
+        if (c.getContainerId().equals(containerId)) {
+          container = c.getContainer();
+          break;
+        }
+      }
+      if (container != null) {
+        // released allocated containers
+        Resources.subtractFrom(allocatedResource, container.getResource());
+      } else {
+        for (RMContainer c : report.getReservedContainers()) {
+          if (c.getContainerId().equals(containerId)) {
+            container = c.getContainer();
+            break;
+          }
+        }
+        if (container != null) {
+          // released reserved containers
+          Resources.subtractFrom(pendingResource, container.getResource());
+        }
+      }
+    }
+    // containers released/preemption from scheduler
+    Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
+    if (allocation.getContainerPreemptions() != null) {
+      preemptionContainers.addAll(allocation.getContainerPreemptions());
+    }
+    if (allocation.getStrictContainerPreemptions() != null) {
+      preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
+    }
+    if (!preemptionContainers.isEmpty()) {
+      for (ContainerId containerId : preemptionContainers) {
+        if (!preemptionContainerMap.containsKey(containerId)) {
+          Container container = null;
+          for (RMContainer c : report.getLiveContainers()) {
+            if (c.getContainerId().equals(containerId)) {
+              container = c.getContainer();
+              break;
+            }
+          }
+          if (container != null) {
+            preemptionContainerMap.put(containerId, container.getResource());
+          }
+        }
+
+      }
+    }
+
+    // update metrics
+    schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
+        queueName);
+  }
+
   public void handle(SchedulerEvent schedulerEvent) {
-    // metrics off
     if (!metricsON) {
-      super.handle(schedulerEvent);
+      scheduler.handle(schedulerEvent);
       return;
     }
 
-    // metrics on
-    if(!schedulerMetrics.isRunning()) {
+    if (!schedulerMetrics.isRunning()) {
       schedulerMetrics.setRunning(true);
     }
 
@@ -153,24 +223,24 @@ public class SLSFairScheduler extends FairScheduler
       if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
           && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
         eventWrapper = new NodeUpdateSchedulerEventWrapper(
-            (NodeUpdateSchedulerEvent)schedulerEvent);
+            (NodeUpdateSchedulerEvent) schedulerEvent);
         schedulerEvent = eventWrapper;
         updateQueueWithNodeUpdate(eventWrapper);
       } else if (
           schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
-          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
+              && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
         // check if having AM Container, update resource usage information
         AppAttemptRemovedSchedulerEvent appRemoveEvent =
             (AppAttemptRemovedSchedulerEvent) schedulerEvent;
         ApplicationAttemptId appAttemptId =
             appRemoveEvent.getApplicationAttemptID();
-        String queueName = getSchedulerApp(appAttemptId).getQueue().getName();
-        SchedulerAppReport app = getSchedulerAppInfo(appAttemptId);
+        String queue = appQueueMap.get(appAttemptId);
+        SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId);
         if (!app.getLiveContainers().isEmpty()) {  // have 0 or 1
           // should have one container which is AM container
           RMContainer rmc = app.getLiveContainers().iterator().next();
           schedulerMetrics.updateQueueMetricsByRelease(
-              rmc.getContainer().getResource(), queueName);
+              rmc.getContainer().getResource(), queue);
         }
       }
 
@@ -178,7 +248,7 @@ public class SLSFairScheduler extends FairScheduler
       operationTimer = schedulerMetrics.getSchedulerHandleTimer(
           schedulerEvent.getType()).time();
 
-      super.handle(schedulerEvent);
+      scheduler.handle(schedulerEvent);
     } finally {
       if (handlerTimer != null) {
         handlerTimer.stop();
@@ -191,14 +261,27 @@ public class SLSFairScheduler extends FairScheduler
       if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
           && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
         SLSRunner.decreaseRemainingApps();
+        AppAttemptRemovedSchedulerEvent appRemoveEvent =
+            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
+        appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
         if (SLSRunner.getRemainingApps() == 0) {
           try {
-            getSchedulerMetrics().tearDown();
+            schedulerMetrics.tearDown();
             SLSRunner.exitSLSRunner();
           } catch (Exception e) {
             LOG.error("Scheduler Metrics failed to tear down.", e);
           }
         }
+      } else if (schedulerEvent.getType() ==
+          SchedulerEventType.APP_ATTEMPT_ADDED
+          && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
+        AppAttemptAddedSchedulerEvent appAddEvent =
+            (AppAttemptAddedSchedulerEvent) schedulerEvent;
+        SchedulerApplication app =
+            (SchedulerApplication) scheduler.getSchedulerApplications()
+                .get(appAddEvent.getApplicationAttemptId().getApplicationId());
+        appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
+            .getQueueName());
       }
     }
   }
@@ -210,7 +293,7 @@ public class SLSFairScheduler extends FairScheduler
     for (UpdatedContainerInfo info : containerList) {
       for (ContainerStatus status : info.getCompletedContainers()) {
         ContainerId containerId = status.getContainerId();
-        SchedulerAppReport app = super.getSchedulerAppInfo(
+        SchedulerAppReport app = scheduler.getSchedulerAppInfo(
             containerId.getApplicationAttemptId());
 
         if (app == null) {
@@ -239,110 +322,22 @@ public class SLSFairScheduler extends FairScheduler
           }
         }
         // update queue counters
-        String queue = getSchedulerApp(containerId.getApplicationAttemptId()).
-            getQueueName();
+        String queue = appQueueMap.get(containerId.getApplicationAttemptId());
         schedulerMetrics.updateQueueMetricsByRelease(
             Resource.newInstance(releasedMemory, releasedVCores), queue);
       }
     }
   }
-
-  private void updateQueueWithAllocateRequest(Allocation allocation,
-      ApplicationAttemptId attemptId,
-      List<ResourceRequest> resourceRequests,
-      List<ContainerId> containerIds) throws IOException {
-    // update queue information
-    Resource pendingResource = Resources.createResource(0, 0);
-    Resource allocatedResource = Resources.createResource(0, 0);
-    // container requested
-    for (ResourceRequest request : resourceRequests) {
-      if (request.getResourceName().equals(ResourceRequest.ANY)) {
-        Resources.addTo(pendingResource,
-            Resources.multiply(request.getCapability(),
-                request.getNumContainers()));
-      }
-    }
-    // container allocated
-    for (Container container : allocation.getContainers()) {
-      Resources.addTo(allocatedResource, container.getResource());
-      Resources.subtractFrom(pendingResource, container.getResource());
-    }
-    // container released from AM
-    SchedulerAppReport report = super.getSchedulerAppInfo(attemptId);
-    for (ContainerId containerId : containerIds) {
-      Container container = null;
-      for (RMContainer c : report.getLiveContainers()) {
-        if (c.getContainerId().equals(containerId)) {
-          container = c.getContainer();
-          break;
-        }
-      }
-      if (container != null) {
-        // released allocated containers
-        Resources.subtractFrom(allocatedResource, container.getResource());
-      } else {
-        for (RMContainer c : report.getReservedContainers()) {
-          if (c.getContainerId().equals(containerId)) {
-            container = c.getContainer();
-            break;
-          }
-        }
-        if (container != null) {
-          // released reserved containers
-          Resources.subtractFrom(pendingResource, container.getResource());
-        }
-      }
-    }
-    // containers released/preemption from scheduler
-    Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
-    if (allocation.getContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getContainerPreemptions());
-    }
-    if (allocation.getStrictContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
-    }
-    if (!preemptionContainers.isEmpty()) {
-      for (ContainerId containerId : preemptionContainers) {
-        if (!preemptionContainerMap.containsKey(containerId)) {
-          Container container = null;
-          for (RMContainer c : report.getLiveContainers()) {
-            if (c.getContainerId().equals(containerId)) {
-              container = c.getContainer();
-              break;
-            }
-          }
-          if (container != null) {
-            preemptionContainerMap.put(containerId, container.getResource());
-          }
-        }
-
-      }
-    }
-
-    // update metrics
-    String queueName = getSchedulerApp(attemptId).getQueueName();
-    schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
-        queueName);
+  
+  public SchedulerMetrics getSchedulerMetrics() {
+    return schedulerMetrics;
   }
 
-  @Override
-  public void serviceStop() throws Exception {
-    try {
-      if (metricsON) {
-        schedulerMetrics.tearDown();
-      }
-    } catch (Exception e) {
-      LOG.error("Caught exception while stopping service", e);
-    }
-    super.serviceStop();
+  public boolean isMetricsON() {
+    return metricsON;
   }
 
-  public String getRealQueueName(String queue) throws YarnException {
-    if (!getQueueManager().exists(queue)) {
-      throw new YarnException("Can't find the queue by the given name: " + queue
-          + "! Please check if queue " + queue + " is in the allocation file.");
-    }
-    return getQueueManager().getQueue(queue).getQueueName();
+  public Tracker getTracker() {
+    return tracker;
   }
 }
-

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org