You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ka...@apache.org on 2014/06/01 21:11:24 UTC

svn commit: r1599025 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn...

Author: kasha
Date: Sun Jun  1 19:11:23 2014
New Revision: 1599025

URL: http://svn.apache.org/r1599025
Log:
YARN-1474. Make sechedulers services. (Tsuyoshi Ozawa via kasha)

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sun Jun  1 19:11:23 2014
@@ -115,6 +115,8 @@ Release 2.5.0 - UNRELEASED
     NMContainerStatus which has more information that is needed for
     work-preserving RM-restart. (Jian He via vinodkv)
 
+    YARN-1474. Make sechedulers services. (Tsuyoshi Ozawa via kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Sun Jun  1 19:11:23 2014
@@ -401,6 +401,8 @@ public class ResourceManager extends Com
 
       // Initialize the scheduler
       scheduler = createScheduler();
+      scheduler.setRMContext(rmContext);
+      addIfService(scheduler);
       rmContext.setScheduler(scheduler);
 
       schedulerDispatcher = createSchedulerEventDispatcher();
@@ -429,12 +431,6 @@ public class ResourceManager extends Com
       DefaultMetricsSystem.initialize("ResourceManager");
       JvmMetrics.initSingleton("ResourceManager", null);
 
-      try {
-        scheduler.reinitialize(conf, rmContext);
-      } catch (IOException ioe) {
-        throw new RuntimeException("Failed to initialize scheduler", ioe);
-      }
-
       // creating monitors that handle preemption
       createPolicyMonitors();
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Sun Jun  1 19:11:23 2014
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -41,7 +42,7 @@ import org.apache.hadoop.yarn.util.resou
 
 public abstract class AbstractYarnScheduler
     <T extends SchedulerApplicationAttempt, N extends SchedulerNode>
-    implements ResourceScheduler {
+    extends AbstractService implements ResourceScheduler {
 
   private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
 
@@ -62,6 +63,15 @@ public abstract class AbstractYarnSchedu
   protected static final Allocation EMPTY_ALLOCATION = new Allocation(
     EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
 
+  /**
+   * Construct the service.
+   *
+   * @param name service name
+   */
+  public AbstractYarnScheduler(String name) {
+    super(name);
+  }
+
   public synchronized List<Container> getTransferredContainers(
       ApplicationAttemptId currentAttempt) {
     ApplicationId appId = currentAttempt.getApplicationId();

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Sun Jun  1 19:11:23 2014
@@ -34,6 +34,15 @@ import org.apache.hadoop.yarn.server.res
 @LimitedPrivate("yarn")
 @Evolving
 public interface ResourceScheduler extends YarnScheduler, Recoverable {
+
+  /**
+   * Set RMContext for <code>ResourceScheduler</code>.
+   * This method should be called immediately after instantiating
+   * a scheduler once.
+   * @param rmContext created by ResourceManager
+   */
+  void setRMContext(RMContext rmContext);
+
   /**
    * Re-initialize the <code>ResourceScheduler</code>.
    * @param conf configuration

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Sun Jun  1 19:11:23 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -102,6 +103,8 @@ public class CapacityScheduler extends
   private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
 
   private CSQueue root;
+  // timeout to join when we stop this service
+  protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
   static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
     @Override
@@ -179,8 +182,6 @@ public class CapacityScheduler extends
 
   private int numNodeManagers = 0;
 
-  private boolean initialized = false;
-
   private ResourceCalculator calculator;
   private boolean usePortForNodeName;
 
@@ -196,7 +197,9 @@ public class CapacityScheduler extends
           + ".scheduling-interval-ms";
   private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
   
-  public CapacityScheduler() {}
+  public CapacityScheduler() {
+    super(CapacityScheduler.class.getName());
+  }
 
   @Override
   public QueueMetrics getRootQueueMetrics() {
@@ -238,56 +241,91 @@ public class CapacityScheduler extends
   }
 
   @Override
-  public RMContext getRMContext() {
+  public synchronized RMContext getRMContext() {
     return this.rmContext;
   }
-  
+
   @Override
-  public synchronized void
-      reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+  }
+
+  private synchronized void initScheduler(Configuration configuration) throws
+      IOException {
+    this.conf = loadCapacitySchedulerConfiguration(configuration);
+    validateConf(this.conf);
+    this.minimumAllocation = this.conf.getMinimumAllocation();
+    this.maximumAllocation = this.conf.getMaximumAllocation();
+    this.calculator = this.conf.getResourceCalculator();
+    this.usePortForNodeName = this.conf.getUsePortForNodeName();
+    this.applications =
+        new ConcurrentHashMap<ApplicationId,
+            SchedulerApplication<FiCaSchedulerApp>>();
+
+    initializeQueues(this.conf);
+
+    scheduleAsynchronously = this.conf.getScheduleAynschronously();
+    asyncScheduleInterval =
+        this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
+            DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+    if (scheduleAsynchronously) {
+      asyncSchedulerThread = new AsyncScheduleThread(this);
+    }
+
+    LOG.info("Initialized CapacityScheduler with " +
+        "calculator=" + getResourceCalculator().getClass() + ", " +
+        "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
+        "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
+        "asynchronousScheduling=" + scheduleAsynchronously + ", " +
+        "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
+  }
+
+  private synchronized void startSchedulerThreads() {
+    if (scheduleAsynchronously) {
+      Preconditions.checkNotNull(asyncSchedulerThread,
+          "asyncSchedulerThread is null");
+      asyncSchedulerThread.start();
+    }
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
     Configuration configuration = new Configuration(conf);
-    if (!initialized) {
-      this.rmContext = rmContext;
-      this.conf = loadCapacitySchedulerConfiguration(configuration);
-      validateConf(this.conf);
-      this.minimumAllocation = this.conf.getMinimumAllocation();
-      this.maximumAllocation = this.conf.getMaximumAllocation();
-      this.calculator = this.conf.getResourceCalculator();
-      this.usePortForNodeName = this.conf.getUsePortForNodeName();
-      this.applications =
-          new ConcurrentHashMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+    initScheduler(configuration);
+    super.serviceInit(conf);
+  }
 
-      initializeQueues(this.conf);
-      
-      scheduleAsynchronously = this.conf.getScheduleAynschronously();
-      asyncScheduleInterval = 
-          this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, 
-              DEFAULT_ASYNC_SCHEDULER_INTERVAL);
-      if (scheduleAsynchronously) {
-        asyncSchedulerThread = new AsyncScheduleThread(this);
-        asyncSchedulerThread.start();
-      }
-      
-      initialized = true;
-      LOG.info("Initialized CapacityScheduler with " +
-          "calculator=" + getResourceCalculator().getClass() + ", " +
-          "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
-          "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
-          "asynchronousScheduling=" + scheduleAsynchronously + ", " +
-          "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
-      
-    } else {
-      CapacitySchedulerConfiguration oldConf = this.conf; 
-      this.conf = loadCapacitySchedulerConfiguration(configuration);
-      validateConf(this.conf);
-      try {
-        LOG.info("Re-initializing queues...");
-        reinitializeQueues(this.conf);
-      } catch (Throwable t) {
-        this.conf = oldConf;
-        throw new IOException("Failed to re-init queues", t);
+  @Override
+  public void serviceStart() throws Exception {
+    startSchedulerThreads();
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    synchronized (this) {
+      if (scheduleAsynchronously && asyncSchedulerThread != null) {
+        asyncSchedulerThread.interrupt();
+        asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS);
       }
     }
+    super.serviceStop();
+  }
+
+  @Override
+  public synchronized void
+  reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+    Configuration configuration = new Configuration(conf);
+    CapacitySchedulerConfiguration oldConf = this.conf;
+    this.conf = loadCapacitySchedulerConfiguration(configuration);
+    validateConf(this.conf);
+    try {
+      LOG.info("Re-initializing queues...");
+      reinitializeQueues(this.conf);
+    } catch (Throwable t) {
+      this.conf = oldConf;
+      throw new IOException("Failed to re-init queues", t);
+    }
   }
   
   long getAsyncScheduleInterval() {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java Sun Jun  1 19:11:23 2014
@@ -68,7 +68,9 @@ public class AllocationFileLoaderService
    * (this is done to prevent loading a file that hasn't been fully written).
    */
   public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
-  
+
+  public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
   private final Clock clock;
 
   private long lastSuccessfulReload; // Last time we successfully reloaded queues
@@ -146,7 +148,14 @@ public class AllocationFileLoaderService
   @Override
   public void stop() {
     running = false;
-    reloadThread.interrupt();
+    if (reloadThread != null) {
+      reloadThread.interrupt();
+      try {
+        reloadThread.join(THREAD_JOIN_TIMEOUT_MS);
+      } catch (InterruptedException e) {
+        LOG.warn("reloadThread fails to join.");
+      }
+    }
     super.stop();
   }
   

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Sun Jun  1 19:11:23 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -117,7 +118,6 @@ import com.google.common.annotations.Vis
 @SuppressWarnings("unchecked")
 public class FairScheduler extends
     AbstractYarnScheduler<FSSchedulerApp, FSSchedulerNode> {
-  private boolean initialized;
   private FairSchedulerConfiguration conf;
 
   private Resource incrAllocation;
@@ -137,6 +137,11 @@ public class FairScheduler extends
   // How often fair shares are re-calculated (ms)
   protected long UPDATE_INTERVAL = 500;
 
+  private Thread updateThread;
+  private Thread schedulingThread;
+  // timeout to join when we stop this service
+  protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
   // Aggregate metrics
   FSQueueMetrics rootMetrics;
 
@@ -182,6 +187,7 @@ public class FairScheduler extends
   AllocationConfiguration allocConf;
   
   public FairScheduler() {
+    super(FairScheduler.class.getName());
     clock = new SystemClock();
     allocsLoader = new AllocationFileLoaderService();
     queueMgr = new QueueManager(this);
@@ -473,7 +479,8 @@ public class FairScheduler extends
     return resToPreempt;
   }
 
-  public RMContainerTokenSecretManager getContainerTokenSecretManager() {
+  public synchronized RMContainerTokenSecretManager
+      getContainerTokenSecretManager() {
     return rmContext.getContainerTokenSecretManager();
   }
 
@@ -1154,87 +1161,130 @@ public class FairScheduler extends
     // NOT IMPLEMENTED
   }
 
-  @Override
-  public synchronized void reinitialize(Configuration conf, RMContext rmContext)
-      throws IOException {
-    if (!initialized) {
-      this.conf = new FairSchedulerConfiguration(conf);
-      validateConf(this.conf);
-      minimumAllocation = this.conf.getMinimumAllocation();
-      maximumAllocation = this.conf.getMaximumAllocation();
-      incrAllocation = this.conf.getIncrementAllocation();
-      continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
-      continuousSchedulingSleepMs =
-              this.conf.getContinuousSchedulingSleepMs();
-      nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
-      rackLocalityThreshold = this.conf.getLocalityThresholdRack();
-      nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
-      rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
-      preemptionEnabled = this.conf.getPreemptionEnabled();
-      preemptionUtilizationThreshold =
-          this.conf.getPreemptionUtilizationThreshold();
-      assignMultiple = this.conf.getAssignMultiple();
-      maxAssign = this.conf.getMaxAssign();
-      sizeBasedWeight = this.conf.getSizeBasedWeight();
-      preemptionInterval = this.conf.getPreemptionInterval();
-      waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
-      usePortForNodeName = this.conf.getUsePortForNodeName();
-      
-      rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
-      this.rmContext = rmContext;
-      // This stores per-application scheduling information
-      this.applications =
-          new ConcurrentHashMap<ApplicationId, SchedulerApplication<FSSchedulerApp>>();
-      this.eventLog = new FairSchedulerEventLog();
-      eventLog.init(this.conf);
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+  }
 
-      initialized = true;
+  private synchronized void initScheduler(Configuration conf)
+      throws IOException {
+    this.conf = new FairSchedulerConfiguration(conf);
+    validateConf(this.conf);
+    minimumAllocation = this.conf.getMinimumAllocation();
+    maximumAllocation = this.conf.getMaximumAllocation();
+    incrAllocation = this.conf.getIncrementAllocation();
+    continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+    continuousSchedulingSleepMs =
+        this.conf.getContinuousSchedulingSleepMs();
+    nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
+    rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+    nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+    rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
+    preemptionEnabled = this.conf.getPreemptionEnabled();
+    preemptionUtilizationThreshold =
+        this.conf.getPreemptionUtilizationThreshold();
+    assignMultiple = this.conf.getAssignMultiple();
+    maxAssign = this.conf.getMaxAssign();
+    sizeBasedWeight = this.conf.getSizeBasedWeight();
+    preemptionInterval = this.conf.getPreemptionInterval();
+    waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+    usePortForNodeName = this.conf.getUsePortForNodeName();
+
+    rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
+    // This stores per-application scheduling information
+    this.applications =
+        new ConcurrentHashMap<ApplicationId,SchedulerApplication<FSSchedulerApp>>();
+    this.eventLog = new FairSchedulerEventLog();
+    eventLog.init(this.conf);
 
-      allocConf = new AllocationConfiguration(conf);
-      try {
-        queueMgr.initialize(conf);
-      } catch (Exception e) {
-        throw new IOException("Failed to start FairScheduler", e);
-      }
+    allocConf = new AllocationConfiguration(conf);
+    try {
+      queueMgr.initialize(conf);
+    } catch (Exception e) {
+      throw new IOException("Failed to start FairScheduler", e);
+    }
 
-      Thread updateThread = new Thread(new UpdateThread());
-      updateThread.setName("FairSchedulerUpdateThread");
-      updateThread.setDaemon(true);
-      updateThread.start();
+    updateThread = new Thread(new UpdateThread());
+    updateThread.setName("FairSchedulerUpdateThread");
+    updateThread.setDaemon(true);
 
-      if (continuousSchedulingEnabled) {
-        // start continuous scheduling thread
-        Thread schedulingThread = new Thread(
+    if (continuousSchedulingEnabled) {
+      // start continuous scheduling thread
+      schedulingThread = new Thread(
           new Runnable() {
             @Override
             public void run() {
               continuousScheduling();
             }
           }
-        );
-        schedulingThread.setName("ContinuousScheduling");
-        schedulingThread.setDaemon(true);
-        schedulingThread.start();
+      );
+      schedulingThread.setName("ContinuousScheduling");
+      schedulingThread.setDaemon(true);
+    }
+
+    allocsLoader.init(conf);
+    allocsLoader.setReloadListener(new AllocationReloadListener());
+    // If we fail to load allocations file on initialize, we want to fail
+    // immediately.  After a successful load, exceptions on future reloads
+    // will just result in leaving things as they are.
+    try {
+      allocsLoader.reloadAllocations();
+    } catch (Exception e) {
+      throw new IOException("Failed to initialize FairScheduler", e);
+    }
+  }
+
+  private synchronized void startSchedulerThreads() {
+    Preconditions.checkNotNull(updateThread, "updateThread is null");
+    Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
+    updateThread.start();
+    if (continuousSchedulingEnabled) {
+      Preconditions.checkNotNull(schedulingThread, "schedulingThread is null");
+      schedulingThread.start();
+    }
+    allocsLoader.start();
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    initScheduler(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    startSchedulerThreads();
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    synchronized (this) {
+      if (updateThread != null) {
+        updateThread.interrupt();
+        updateThread.join(THREAD_JOIN_TIMEOUT_MS);
       }
-      
-      allocsLoader.init(conf);
-      allocsLoader.setReloadListener(new AllocationReloadListener());
-      // If we fail to load allocations file on initialize, we want to fail
-      // immediately.  After a successful load, exceptions on future reloads
-      // will just result in leaving things as they are.
-      try {
-        allocsLoader.reloadAllocations();
-      } catch (Exception e) {
-        throw new IOException("Failed to initialize FairScheduler", e);
+      if (continuousSchedulingEnabled) {
+        if (schedulingThread != null) {
+          schedulingThread.interrupt();
+          schedulingThread.join(THREAD_JOIN_TIMEOUT_MS);
+        }
       }
-      allocsLoader.start();
-    } else {
-      try {
-        allocsLoader.reloadAllocations();
-      } catch (Exception e) {
-        LOG.error("Failed to reload allocations file", e);
+      if (allocsLoader != null) {
+        allocsLoader.stop();
       }
     }
+
+    super.serviceStop();
+  }
+
+  @Override
+  public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+      throws IOException {
+    try {
+      allocsLoader.reloadAllocations();
+    } catch (Exception e) {
+      LOG.error("Failed to reload allocations file", e);
+    }
   }
 
   @Override

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Sun Jun  1 19:11:23 2014
@@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -111,7 +112,6 @@ public class FifoScheduler extends
 
   Configuration conf;
 
-  private boolean initialized;
   private boolean usePortForNodeName;
 
   private ActiveUsersManager activeUsersManager;
@@ -180,6 +180,47 @@ public class FifoScheduler extends
     }
   };
 
+  public FifoScheduler() {
+    super(FifoScheduler.class.getName());
+  }
+
+  private synchronized void initScheduler(Configuration conf) {
+    validateConf(conf);
+    //Use ConcurrentSkipListMap because applications need to be ordered
+    this.applications =
+        new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+    this.minimumAllocation =
+        Resources.createResource(conf.getInt(
+            YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+    this.maximumAllocation =
+        Resources.createResource(conf.getInt(
+            YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+    this.usePortForNodeName = conf.getBoolean(
+        YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
+    this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
+        conf);
+    this.activeUsersManager = new ActiveUsersManager(metrics);
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    initScheduler(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
   @Override
   public synchronized void setConf(Configuration conf) {
     this.conf = conf;
@@ -216,35 +257,17 @@ public class FifoScheduler extends
   }
 
   @Override
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+  }
+
+  @Override
   public synchronized void
       reinitialize(Configuration conf, RMContext rmContext) throws IOException
   {
     setConf(conf);
-    if (!this.initialized) {
-      validateConf(conf);
-      this.rmContext = rmContext;
-      //Use ConcurrentSkipListMap because applications need to be ordered
-      this.applications =
-          new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
-      this.minimumAllocation = 
-        Resources.createResource(conf.getInt(
-            YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
-      this.maximumAllocation = 
-        Resources.createResource(conf.getInt(
-            YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
-      this.usePortForNodeName = conf.getBoolean(
-          YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, 
-          YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
-      this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
-          conf);
-      this.activeUsersManager = new ActiveUsersManager(metrics);
-      this.initialized = true;
-    }
   }
 
-
   @Override
   public Allocation allocate(
       ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Sun Jun  1 19:11:23 2014
@@ -77,12 +77,12 @@ public class TestFifoScheduler {
 
   @Test (timeout = 30000)
   public void testConfValidation() throws Exception {
-    ResourceScheduler scheduler = new FifoScheduler();
+    FifoScheduler scheduler = new FifoScheduler();
     Configuration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
     try {
-      scheduler.reinitialize(conf, null);
+      scheduler.serviceInit(conf);
       fail("Exception is expected because the min memory allocation is" +
         " larger than the max memory allocation.");
     } catch (YarnRuntimeException e) {
@@ -218,6 +218,9 @@ public class TestFifoScheduler {
   public void testNodeUpdateBeforeAppAttemptInit() throws Exception {
     FifoScheduler scheduler = new FifoScheduler();
     MockRM rm = new MockRM(conf);
+    scheduler.setRMContext(rm.getRMContext());
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, rm.getRMContext());
 
     RMNode node = MockNodes.newNodeInfo(1,
@@ -293,6 +296,8 @@ public class TestFifoScheduler {
     conf.setQueues("default", new String[] {"default"});
     conf.setCapacity("default", 100);
     FifoScheduler fs = new FifoScheduler();
+    fs.init(conf);
+    fs.start();
     fs.reinitialize(conf, null);
 
     RMNode n1 =
@@ -313,6 +318,7 @@ public class TestFifoScheduler {
     fs.handle(new NodeUpdateSchedulerEvent(n1));
 
     Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
+    fs.stop();
   }
   
   @Test (timeout = 50000)

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Sun Jun  1 19:11:23 2014
@@ -121,13 +121,16 @@ public class TestCapacityScheduler {
 
   @After
   public void tearDown() throws Exception {
-    resourceManager.stop();
+    if (resourceManager != null) {
+      resourceManager.stop();
+    }
   }
 
 
   @Test (timeout = 30000)
   public void testConfValidation() throws Exception {
     ResourceScheduler scheduler = new CapacityScheduler();
+    scheduler.setRMContext(resourceManager.getRMContext());
     Configuration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
@@ -342,18 +345,23 @@ public class TestCapacityScheduler {
   public void testRefreshQueues() throws Exception {
     CapacityScheduler cs = new CapacityScheduler();
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    RMContextImpl rmContext =  new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
     setupQueueConfiguration(conf);
     cs.setConf(new YarnConfiguration());
-    cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
-      null, new RMContainerTokenSecretManager(conf),
-      new NMTokenSecretManagerInRM(conf),
-      new ClientToAMTokenSecretManagerInRM(), null));
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, rmContext);
     checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
 
     conf.setCapacity(A, 80f);
     conf.setCapacity(B, 20f);
     cs.reinitialize(conf, mockContext);
     checkQueueCapacities(cs, 80f, 20f);
+    cs.stop();
   }
 
   private void checkQueueCapacities(CapacityScheduler cs,
@@ -456,6 +464,9 @@ public class TestCapacityScheduler {
     setupQueueConfiguration(csConf);
     CapacityScheduler cs = new CapacityScheduler();
     cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(csConf);
+    cs.start();
     cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
       null, null, new RMContainerTokenSecretManager(csConf),
       new NMTokenSecretManagerInRM(csConf),
@@ -475,6 +486,7 @@ public class TestCapacityScheduler {
     cs.handle(new NodeAddedSchedulerEvent(n1));
 
     Assert.assertEquals(4 * GB, cs.getClusterResource().getMemory());
+    cs.stop();
   }
 
   @Test
@@ -483,6 +495,9 @@ public class TestCapacityScheduler {
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
     setupQueueConfiguration(conf);
     cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
     cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
       null, new RMContainerTokenSecretManager(conf),
       new NMTokenSecretManagerInRM(conf),
@@ -513,6 +528,7 @@ public class TestCapacityScheduler {
       assertEquals(queueB, queueB4.getParent());
     } finally {
       B3_CAPACITY += B4_CAPACITY;
+      cs.stop();
     }
   }
   @Test

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Sun Jun  1 19:11:23 2014
@@ -114,7 +114,7 @@ public class TestLeafQueue {
     setupQueueConfiguration(csConf, newRoot);
     YarnConfiguration conf = new YarnConfiguration();
     cs.setConf(conf);
-    
+
     csContext = mock(CapacitySchedulerContext.class);
     when(csContext.getConfiguration()).thenReturn(csConf);
     when(csContext.getConf()).thenReturn(conf);
@@ -142,7 +142,9 @@ public class TestLeafQueue {
             queues, queues, 
             TestUtils.spyHook);
 
-    cs.reinitialize(csConf, rmContext);
+    cs.setRMContext(rmContext);
+    cs.init(csConf);
+    cs.start();
   }
   
   private static final String A = "a";
@@ -2080,5 +2082,8 @@ public class TestLeafQueue {
 
   @After
   public void tearDown() throws Exception {
+    if (cs != null) {
+      cs.stop();
+    }
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java Sun Jun  1 19:11:23 2014
@@ -43,11 +43,15 @@ public class TestQueueParsing {
     YarnConfiguration conf = new YarnConfiguration(csConf);
 
     CapacityScheduler capacityScheduler = new CapacityScheduler();
+    RMContextImpl rmContext = new RMContextImpl(null, null,
+        null, null, null, null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
     capacityScheduler.setConf(conf);
-    capacityScheduler.reinitialize(conf, new RMContextImpl(null, null,
-      null, null, null, null, new RMContainerTokenSecretManager(conf),
-      new NMTokenSecretManagerInRM(conf),
-      new ClientToAMTokenSecretManagerInRM(), null));
+    capacityScheduler.setRMContext(rmContext);
+    capacityScheduler.init(conf);
+    capacityScheduler.start();
+    capacityScheduler.reinitialize(conf, rmContext);
     
     CSQueue a = capacityScheduler.getQueue("a");
     Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
@@ -62,6 +66,7 @@ public class TestQueueParsing {
     Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
     Assert.assertEquals(0.7 * 0.55 * 0.7, 
         c12.getAbsoluteMaximumCapacity(), DELTA);
+    capacityScheduler.stop();
   }
   
   private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
@@ -142,7 +147,10 @@ public class TestQueueParsing {
 
     CapacityScheduler capacityScheduler = new CapacityScheduler();
     capacityScheduler.setConf(new YarnConfiguration());
+    capacityScheduler.init(conf);
+    capacityScheduler.start();
     capacityScheduler.reinitialize(conf, null);
+    capacityScheduler.stop();
   }
   
   public void testMaxCapacity() throws Exception {
@@ -164,6 +172,8 @@ public class TestQueueParsing {
     try {
       capacityScheduler = new CapacityScheduler();
       capacityScheduler.setConf(new YarnConfiguration());
+      capacityScheduler.init(conf);
+      capacityScheduler.start();
       capacityScheduler.reinitialize(conf, null);
     } catch (IllegalArgumentException iae) {
       fail = true;
@@ -176,6 +186,8 @@ public class TestQueueParsing {
     // Now this should work
     capacityScheduler = new CapacityScheduler();
     capacityScheduler.setConf(new YarnConfiguration());
+    capacityScheduler.init(conf);
+    capacityScheduler.start();
     capacityScheduler.reinitialize(conf, null);
     
     fail = false;
@@ -187,6 +199,7 @@ public class TestQueueParsing {
     }
     Assert.assertTrue("Didn't throw IllegalArgumentException for wrong " +
     		"setMaxCap", fail);
+    capacityScheduler.stop();
   }
   
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java Sun Jun  1 19:11:23 2014
@@ -48,6 +48,8 @@ public class TestFSLeafQueue {
     ResourceManager resourceManager = new ResourceManager();
     resourceManager.init(conf);
     ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     
     String queueName = "root.queue1";

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Sun Jun  1 19:11:23 2014
@@ -120,6 +120,8 @@ public class TestFairScheduler extends F
 
     // to initialize the master key
     resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
+
+    scheduler.setRMContext(resourceManager.getRMContext());
   }
 
   @After
@@ -133,12 +135,12 @@ public class TestFairScheduler extends F
 
   @Test (timeout = 30000)
   public void testConfValidation() throws Exception {
-    ResourceScheduler scheduler = new FairScheduler();
+    FairScheduler scheduler = new FairScheduler();
     Configuration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
     try {
-      scheduler.reinitialize(conf, null);
+      scheduler.serviceInit(conf);
       fail("Exception is expected because the min memory allocation is" +
         " larger than the max memory allocation.");
     } catch (YarnRuntimeException e) {
@@ -152,7 +154,7 @@ public class TestFairScheduler extends F
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
     try {
-      scheduler.reinitialize(conf, null);
+      scheduler.serviceInit(conf);
       fail("Exception is expected because the min vcores allocation is" +
         " larger than the max vcores allocation.");
     } catch (YarnRuntimeException e) {
@@ -184,6 +186,8 @@ public class TestFairScheduler extends F
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
     conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 
       128);
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     Assert.assertEquals(true, scheduler.assignMultiple);
     Assert.assertEquals(3, scheduler.maxAssign);
@@ -211,6 +215,7 @@ public class TestFairScheduler extends F
       FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
     conf.setInt(
       FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
+    fs.init(conf);
     fs.reinitialize(conf, null);
     Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory());
     Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores());
@@ -228,8 +233,9 @@ public class TestFairScheduler extends F
       FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
     conf.setInt(
       FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
-    fs.reinitialize(conf, null);  
-    Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory());  
+    fs.init(conf);
+    fs.reinitialize(conf, null);
+    Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory());
     Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores());
     Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory());
     Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores());
@@ -237,6 +243,8 @@ public class TestFairScheduler extends F
   
   @Test
   public void testAggregateCapacityTracking() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add a node
@@ -262,6 +270,8 @@ public class TestFairScheduler extends F
 
   @Test
   public void testSimpleFairShareCalculation() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add one big node (only care about aggregate capacity)
@@ -289,6 +299,8 @@ public class TestFairScheduler extends F
   
   @Test
   public void testSimpleHierarchicalFairShareCalculation() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add one big node (only care about aggregate capacity)
@@ -322,6 +334,8 @@ public class TestFairScheduler extends F
 
   @Test
   public void testHierarchicalQueuesSimilarParents() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     QueueManager queueManager = scheduler.getQueueManager();
@@ -346,6 +360,8 @@ public class TestFairScheduler extends F
 
   @Test
   public void testSchedulerRootQueueMetrics() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add a node
@@ -385,6 +401,8 @@ public class TestFairScheduler extends F
 
   @Test (timeout = 5000)
   public void testSimpleContainerAllocation() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add a node
@@ -433,6 +451,8 @@ public class TestFairScheduler extends F
 
   @Test (timeout = 5000)
   public void testSimpleContainerReservation() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add a node
@@ -487,6 +507,8 @@ public class TestFairScheduler extends F
   @Test
   public void testUserAsDefaultQueue() throws Exception {
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     RMContext rmContext = resourceManager.getRMContext();
     Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
@@ -513,6 +535,8 @@ public class TestFairScheduler extends F
   @Test
   public void testNotUserAsDefaultQueue() throws Exception {
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     RMContext rmContext = resourceManager.getRMContext();
     Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
@@ -539,6 +563,8 @@ public class TestFairScheduler extends F
 
   @Test
   public void testEmptyQueueName() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // only default queue
@@ -559,8 +585,10 @@ public class TestFairScheduler extends F
   @Test
   public void testAssignToQueue() throws Exception {
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
     RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
     
@@ -577,6 +605,8 @@ public class TestFairScheduler extends F
   @Test
   public void testAssignToNonLeafQueueReturnsNull() throws Exception {
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     scheduler.getQueueManager().getLeafQueue("root.child1.granchild", true);
@@ -594,6 +624,8 @@ public class TestFairScheduler extends F
   public void testQueuePlacementWithPolicy() throws Exception {
     conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
         SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     ApplicationAttemptId appId;
@@ -654,7 +686,9 @@ public class TestFairScheduler extends F
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-    
+
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add one big node (only care about aggregate capacity)
@@ -703,6 +737,8 @@ public class TestFairScheduler extends F
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
 
@@ -735,6 +771,8 @@ public class TestFairScheduler extends F
     RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
     RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     int capacity = 16 * 1024;
@@ -769,6 +807,8 @@ public class TestFairScheduler extends F
    */
   @Test
   public void testQueueDemandCalculation() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     ApplicationAttemptId id11 = createAppAttemptId(1, 1);
@@ -819,6 +859,8 @@ public class TestFairScheduler extends F
 
   @Test
   public void testAppAdditionAndRemoval() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     ApplicationAttemptId attemptId =createAppAttemptId(1, 1);
     AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default",
@@ -869,6 +911,8 @@ public class TestFairScheduler extends F
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     QueueManager queueManager = scheduler.getQueueManager();
@@ -901,7 +945,9 @@ public class TestFairScheduler extends F
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-    
+
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     QueueManager queueManager = scheduler.getQueueManager();
     
@@ -928,6 +974,8 @@ public class TestFairScheduler extends F
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add one big node (only care about aggregate capacity)
@@ -985,8 +1033,10 @@ public class TestFairScheduler extends F
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     // Add one big node (only care about aggregate capacity)
     RMNode node1 =
         MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
@@ -1057,7 +1107,9 @@ public class TestFairScheduler extends F
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-    
+
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Create two nodes
@@ -1224,6 +1276,8 @@ public class TestFairScheduler extends F
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Create four nodes
@@ -1321,6 +1375,8 @@ public class TestFairScheduler extends F
 
   @Test (timeout = 5000)
   public void testMultipleContainersWaitingForReservation() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add a node
@@ -1363,8 +1419,10 @@ public class TestFairScheduler extends F
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     // Add a node
     RMNode node1 =
         MockNodes
@@ -1404,6 +1462,8 @@ public class TestFairScheduler extends F
   
   @Test (timeout = 5000)
   public void testReservationWhileMultiplePriorities() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add a node
@@ -1484,8 +1544,10 @@ public class TestFairScheduler extends F
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
         "norealuserhasthisname", 1);
     ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
@@ -1499,6 +1561,8 @@ public class TestFairScheduler extends F
   
   @Test (timeout = 5000)
   public void testMultipleNodesSingleRackRequest() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 =
@@ -1548,6 +1612,8 @@ public class TestFairScheduler extends F
   
   @Test (timeout = 5000)
   public void testFifoWithinQueue() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 =
@@ -1592,6 +1658,8 @@ public class TestFairScheduler extends F
   @Test(timeout = 3000)
   public void testMaxAssign() throws Exception {
     conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node =
@@ -1635,6 +1703,8 @@ public class TestFairScheduler extends F
    */
   @Test(timeout = 5000)
   public void testAssignContainer() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     final String user = "user1";
@@ -1718,9 +1788,11 @@ public class TestFairScheduler extends F
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-    
+
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     int appId = this.APP_ID++;
     String user = "usernotallow";
     String queue = "queue1";
@@ -1769,6 +1841,8 @@ public class TestFairScheduler extends F
   
   @Test
   public void testReservationThatDoesntFit() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 =
@@ -1797,6 +1871,8 @@ public class TestFairScheduler extends F
   
   @Test
   public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
@@ -1825,6 +1901,8 @@ public class TestFairScheduler extends F
 
   @Test
   public void testStrictLocality() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -1865,6 +1943,8 @@ public class TestFairScheduler extends F
   
   @Test
   public void testCancelStrictLocality() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -1915,6 +1995,8 @@ public class TestFairScheduler extends F
    */
   @Test
   public void testReservationsStrictLocality() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -1955,6 +2037,8 @@ public class TestFairScheduler extends F
   
   @Test
   public void testNoMoreCpuOnNode() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
@@ -1976,6 +2060,8 @@ public class TestFairScheduler extends F
 
   @Test
   public void testBasicDRFAssignment() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
@@ -2016,6 +2102,8 @@ public class TestFairScheduler extends F
    */
   @Test
   public void testBasicDRFWithQueues() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
@@ -2052,6 +2140,8 @@ public class TestFairScheduler extends F
   
   @Test
   public void testDRFHierarchicalQueues() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
@@ -2120,7 +2210,9 @@ public class TestFairScheduler extends F
   public void testHostPortNodeName() throws Exception {
     conf.setBoolean(YarnConfiguration
         .RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
-    scheduler.reinitialize(conf, 
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf,
         resourceManager.getRMContext());
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024),
         1, "127.0.0.1", 1);
@@ -2200,9 +2292,11 @@ public class TestFairScheduler extends F
     out.println("</user>");
     out.println("</allocations>");
     out.close();
-    
+
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     // exceeds no limits
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1");
     verifyAppRunnable(attId1, true);
@@ -2254,9 +2348,11 @@ public class TestFairScheduler extends F
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-    
+
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     // exceeds no limits
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1");
     verifyAppRunnable(attId1, true);
@@ -2316,6 +2412,9 @@ public class TestFairScheduler extends F
     Configuration conf = createConfiguration();
     conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
             true);
+    fs.setRMContext(resourceManager.getRMContext());
+    fs.init(conf);
+    fs.start();
     fs.reinitialize(conf, resourceManager.getRMContext());
     Assert.assertTrue("Continuous scheduling should be enabled.",
             fs.isContinuousSchedulingEnabled());
@@ -2396,6 +2495,8 @@ public class TestFairScheduler extends F
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     QueueManager queueManager = scheduler.getQueueManager();
     
@@ -2439,6 +2540,8 @@ public class TestFairScheduler extends F
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     List<QueuePlacementRule> rules = scheduler.allocConf.placementPolicy
@@ -2455,6 +2558,8 @@ public class TestFairScheduler extends F
   @SuppressWarnings("resource")
   @Test
   public void testBlacklistNodes() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     final int GB = 1024;
@@ -2507,6 +2612,8 @@ public class TestFairScheduler extends F
   
   @Test
   public void testGetAppsInQueue() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     ApplicationAttemptId appAttId1 =
@@ -2552,8 +2659,10 @@ public class TestFairScheduler extends F
 
   @Test
   public void testMoveRunnableApp() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     QueueManager queueMgr = scheduler.getQueueManager();
     FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
     FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true);
@@ -2591,8 +2700,10 @@ public class TestFairScheduler extends F
   
   @Test
   public void testMoveNonRunnableApp() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     QueueManager queueMgr = scheduler.getQueueManager();
     FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
     FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true);
@@ -2611,8 +2722,10 @@ public class TestFairScheduler extends F
   
   @Test
   public void testMoveMakesAppRunnable() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     QueueManager queueMgr = scheduler.getQueueManager();
     FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
     FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true);
@@ -2638,8 +2751,10 @@ public class TestFairScheduler extends F
     
   @Test (expected = YarnException.class)
   public void testMoveWouldViolateMaxAppsConstraints() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     QueueManager queueMgr = scheduler.getQueueManager();
     queueMgr.getLeafQueue("queue2", true);
     scheduler.getAllocationConfiguration().queueMaxApps.put("root.queue2", 0);
@@ -2652,8 +2767,10 @@ public class TestFairScheduler extends F
   
   @Test (expected = YarnException.class)
   public void testMoveWouldViolateMaxResourcesConstraints() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     QueueManager queueMgr = scheduler.getQueueManager();
     FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
     queueMgr.getLeafQueue("queue2", true);
@@ -2675,6 +2792,8 @@ public class TestFairScheduler extends F
   
   @Test (expected = YarnException.class)
   public void testMoveToNonexistentQueue() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     scheduler.getQueueManager().getLeafQueue("queue1", true);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java Sun Jun  1 19:11:23 2014
@@ -51,6 +51,8 @@ public class TestFairSchedulerEventLog {
     resourceManager = new ResourceManager();
     resourceManager.init(conf);
     ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
   }
 
@@ -69,7 +71,13 @@ public class TestFairSchedulerEventLog {
   public void tearDown() {
     logFile.delete();
     logFile.getParentFile().delete(); // fairscheduler/
-    scheduler = null;
-    resourceManager = null;
+    if (scheduler != null) {
+      scheduler.stop();
+      scheduler = null;
+    }
+    if (resourceManager != null) {
+      resourceManager.stop();
+      resourceManager = null;
+    }
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Sun Jun  1 19:11:23 2014
@@ -145,9 +145,13 @@ public class TestFifoScheduler {
     RMContext rmContext = new RMContextImpl(dispatcher, null,
         null, null, null, null, null, null, null, writer);
 
-    FifoScheduler schedular = new FifoScheduler();
-    schedular.reinitialize(new Configuration(), rmContext);
-    QueueMetrics metrics = schedular.getRootQueueMetrics();
+    FifoScheduler scheduler = new FifoScheduler();
+    Configuration conf = new Configuration();
+    scheduler.setRMContext(rmContext);
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, rmContext);
+    QueueMetrics metrics = scheduler.getRootQueueMetrics();
     int beforeAppsSubmitted = metrics.getAppsSubmitted();
 
     ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
@@ -155,18 +159,19 @@ public class TestFifoScheduler {
         appId, 1);
 
     SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user");
-    schedular.handle(appEvent);
+    scheduler.handle(appEvent);
     SchedulerEvent attemptEvent =
         new AppAttemptAddedSchedulerEvent(appAttemptId, false);
-    schedular.handle(attemptEvent);
+    scheduler.handle(attemptEvent);
 
     appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
     SchedulerEvent attemptEvent2 =
         new AppAttemptAddedSchedulerEvent(appAttemptId, false);
-    schedular.handle(attemptEvent2);
+    scheduler.handle(attemptEvent2);
 
     int afterAppsSubmitted = metrics.getAppsSubmitted();
     Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted);
+    scheduler.stop();
   }
 
   @Test(timeout=2000)
@@ -184,6 +189,9 @@ public class TestFifoScheduler {
         null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
 
     FifoScheduler scheduler = new FifoScheduler();
+    scheduler.setRMContext(rmContext);
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(new Configuration(), rmContext);
 
     RMNode node0 = MockNodes.newNodeInfo(1,
@@ -232,6 +240,7 @@ public class TestFifoScheduler {
     //Also check that the containers were scheduled
     SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId);
     Assert.assertEquals(3, info.getLiveContainers().size());
+    scheduler.stop();
   }
   
   @Test(timeout=2000)
@@ -254,6 +263,9 @@ public class TestFifoScheduler {
         return nodes;
       }
     };
+    scheduler.setRMContext(rmContext);
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(new Configuration(), rmContext);
     RMNode node0 = MockNodes.newNodeInfo(1,
         Resources.createResource(2048, 4), 1, "127.0.0.1");

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java Sun Jun  1 19:11:23 2014
@@ -203,10 +203,11 @@ public class TestRMWebApp {
 
     CapacityScheduler cs = new CapacityScheduler();
     cs.setConf(new YarnConfiguration());
-    cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
+    cs.setRMContext(new RMContextImpl(null, null, null, null, null,
         null, new RMContainerTokenSecretManager(conf),
         new NMTokenSecretManagerInRM(conf),
         new ClientToAMTokenSecretManagerInRM(), null));
+    cs.init(conf);
     return cs;
   }
 
@@ -269,19 +270,21 @@ public class TestRMWebApp {
     ResourceManager rm = mock(ResourceManager.class);
     RMContext rmContext = mockRMContext(apps, racks, nodes,
         mbsPerNode);
-    ResourceScheduler rs = mockFifoScheduler();
+    ResourceScheduler rs = mockFifoScheduler(rmContext);
     when(rm.getResourceScheduler()).thenReturn(rs);
     when(rm.getRMContext()).thenReturn(rmContext);
     return rm;
   }
 
-  public static FifoScheduler mockFifoScheduler() throws Exception {
+  public static FifoScheduler mockFifoScheduler(RMContext rmContext)
+      throws Exception {
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
     setupFifoQueueConfiguration(conf);
 
     FifoScheduler fs = new FifoScheduler();
     fs.setConf(new YarnConfiguration());
-    fs.reinitialize(conf, null);
+    fs.setRMContext(rmContext);
+    fs.init(conf);
     return fs;
   }