You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/06/07 18:29:20 UTC

svn commit: r1601151 [4/5] - in /hadoop/common/branches/HDFS-5442/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yar...

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Sat Jun  7 16:29:10 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -117,7 +118,6 @@ import com.google.common.annotations.Vis
 @SuppressWarnings("unchecked")
 public class FairScheduler extends
     AbstractYarnScheduler<FSSchedulerApp, FSSchedulerNode> {
-  private boolean initialized;
   private FairSchedulerConfiguration conf;
 
   private Resource incrAllocation;
@@ -137,6 +137,11 @@ public class FairScheduler extends
   // How often fair shares are re-calculated (ms)
   protected long UPDATE_INTERVAL = 500;
 
+  private Thread updateThread;
+  private Thread schedulingThread;
+  // timeout to join when we stop this service
+  protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
   // Aggregate metrics
   FSQueueMetrics rootMetrics;
 
@@ -182,6 +187,7 @@ public class FairScheduler extends
   AllocationConfiguration allocConf;
   
   public FairScheduler() {
+    super(FairScheduler.class.getName());
     clock = new SystemClock();
     allocsLoader = new AllocationFileLoaderService();
     queueMgr = new QueueManager(this);
@@ -473,7 +479,8 @@ public class FairScheduler extends
     return resToPreempt;
   }
 
-  public RMContainerTokenSecretManager getContainerTokenSecretManager() {
+  public synchronized RMContainerTokenSecretManager
+      getContainerTokenSecretManager() {
     return rmContext.getContainerTokenSecretManager();
   }
 
@@ -829,6 +836,12 @@ public class FairScheduler extends
     SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
         clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
 
+    // Set amResource for this app
+    if (!application.getUnmanagedAM() && ask.size() == 1
+        && application.getLiveContainers().isEmpty()) {
+      application.setAMResource(ask.get(0).getCapability());
+    }
+
     // Release containers
     for (ContainerId releasedContainerId : release) {
       RMContainer rmContainer = getRMContainer(releasedContainerId);
@@ -1154,87 +1167,130 @@ public class FairScheduler extends
     // NOT IMPLEMENTED
   }
 
-  @Override
-  public synchronized void reinitialize(Configuration conf, RMContext rmContext)
-      throws IOException {
-    if (!initialized) {
-      this.conf = new FairSchedulerConfiguration(conf);
-      validateConf(this.conf);
-      minimumAllocation = this.conf.getMinimumAllocation();
-      maximumAllocation = this.conf.getMaximumAllocation();
-      incrAllocation = this.conf.getIncrementAllocation();
-      continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
-      continuousSchedulingSleepMs =
-              this.conf.getContinuousSchedulingSleepMs();
-      nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
-      rackLocalityThreshold = this.conf.getLocalityThresholdRack();
-      nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
-      rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
-      preemptionEnabled = this.conf.getPreemptionEnabled();
-      preemptionUtilizationThreshold =
-          this.conf.getPreemptionUtilizationThreshold();
-      assignMultiple = this.conf.getAssignMultiple();
-      maxAssign = this.conf.getMaxAssign();
-      sizeBasedWeight = this.conf.getSizeBasedWeight();
-      preemptionInterval = this.conf.getPreemptionInterval();
-      waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
-      usePortForNodeName = this.conf.getUsePortForNodeName();
-      
-      rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
-      this.rmContext = rmContext;
-      // This stores per-application scheduling information
-      this.applications =
-          new ConcurrentHashMap<ApplicationId, SchedulerApplication<FSSchedulerApp>>();
-      this.eventLog = new FairSchedulerEventLog();
-      eventLog.init(this.conf);
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+  }
 
-      initialized = true;
+  private synchronized void initScheduler(Configuration conf)
+      throws IOException {
+    this.conf = new FairSchedulerConfiguration(conf);
+    validateConf(this.conf);
+    minimumAllocation = this.conf.getMinimumAllocation();
+    maximumAllocation = this.conf.getMaximumAllocation();
+    incrAllocation = this.conf.getIncrementAllocation();
+    continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+    continuousSchedulingSleepMs =
+        this.conf.getContinuousSchedulingSleepMs();
+    nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
+    rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+    nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+    rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
+    preemptionEnabled = this.conf.getPreemptionEnabled();
+    preemptionUtilizationThreshold =
+        this.conf.getPreemptionUtilizationThreshold();
+    assignMultiple = this.conf.getAssignMultiple();
+    maxAssign = this.conf.getMaxAssign();
+    sizeBasedWeight = this.conf.getSizeBasedWeight();
+    preemptionInterval = this.conf.getPreemptionInterval();
+    waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+    usePortForNodeName = this.conf.getUsePortForNodeName();
+
+    rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
+    // This stores per-application scheduling information
+    this.applications =
+        new ConcurrentHashMap<ApplicationId,SchedulerApplication<FSSchedulerApp>>();
+    this.eventLog = new FairSchedulerEventLog();
+    eventLog.init(this.conf);
 
-      allocConf = new AllocationConfiguration(conf);
-      try {
-        queueMgr.initialize(conf);
-      } catch (Exception e) {
-        throw new IOException("Failed to start FairScheduler", e);
-      }
+    allocConf = new AllocationConfiguration(conf);
+    try {
+      queueMgr.initialize(conf);
+    } catch (Exception e) {
+      throw new IOException("Failed to start FairScheduler", e);
+    }
 
-      Thread updateThread = new Thread(new UpdateThread());
-      updateThread.setName("FairSchedulerUpdateThread");
-      updateThread.setDaemon(true);
-      updateThread.start();
+    updateThread = new Thread(new UpdateThread());
+    updateThread.setName("FairSchedulerUpdateThread");
+    updateThread.setDaemon(true);
 
-      if (continuousSchedulingEnabled) {
-        // start continuous scheduling thread
-        Thread schedulingThread = new Thread(
+    if (continuousSchedulingEnabled) {
+      // start continuous scheduling thread
+      schedulingThread = new Thread(
           new Runnable() {
             @Override
             public void run() {
               continuousScheduling();
             }
           }
-        );
-        schedulingThread.setName("ContinuousScheduling");
-        schedulingThread.setDaemon(true);
-        schedulingThread.start();
+      );
+      schedulingThread.setName("ContinuousScheduling");
+      schedulingThread.setDaemon(true);
+    }
+
+    allocsLoader.init(conf);
+    allocsLoader.setReloadListener(new AllocationReloadListener());
+    // If we fail to load allocations file on initialize, we want to fail
+    // immediately.  After a successful load, exceptions on future reloads
+    // will just result in leaving things as they are.
+    try {
+      allocsLoader.reloadAllocations();
+    } catch (Exception e) {
+      throw new IOException("Failed to initialize FairScheduler", e);
+    }
+  }
+
+  private synchronized void startSchedulerThreads() {
+    Preconditions.checkNotNull(updateThread, "updateThread is null");
+    Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
+    updateThread.start();
+    if (continuousSchedulingEnabled) {
+      Preconditions.checkNotNull(schedulingThread, "schedulingThread is null");
+      schedulingThread.start();
+    }
+    allocsLoader.start();
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    initScheduler(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    startSchedulerThreads();
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    synchronized (this) {
+      if (updateThread != null) {
+        updateThread.interrupt();
+        updateThread.join(THREAD_JOIN_TIMEOUT_MS);
       }
-      
-      allocsLoader.init(conf);
-      allocsLoader.setReloadListener(new AllocationReloadListener());
-      // If we fail to load allocations file on initialize, we want to fail
-      // immediately.  After a successful load, exceptions on future reloads
-      // will just result in leaving things as they are.
-      try {
-        allocsLoader.reloadAllocations();
-      } catch (Exception e) {
-        throw new IOException("Failed to initialize FairScheduler", e);
+      if (continuousSchedulingEnabled) {
+        if (schedulingThread != null) {
+          schedulingThread.interrupt();
+          schedulingThread.join(THREAD_JOIN_TIMEOUT_MS);
+        }
       }
-      allocsLoader.start();
-    } else {
-      try {
-        allocsLoader.reloadAllocations();
-      } catch (Exception e) {
-        LOG.error("Failed to reload allocations file", e);
+      if (allocsLoader != null) {
+        allocsLoader.stop();
       }
     }
+
+    super.serviceStop();
+  }
+
+  @Override
+  public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+      throws IOException {
+    try {
+      allocsLoader.reloadAllocations();
+    } catch (Exception e) {
+      LOG.error("Failed to reload allocations file", e);
+    }
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Sat Jun  7 16:29:10 2014
@@ -149,4 +149,15 @@ public abstract class SchedulingPolicy {
    */
   public abstract boolean checkIfUsageOverFairShare(
       Resource usage, Resource fairShare);
+
+  /**
+   * Check if a leaf queue's AM resource usage over its limit under this policy
+   *
+   * @param usage {@link Resource} the resource used by application masters
+   * @param maxAMResource {@link Resource} the maximum allowed resource for
+   *                                      application masters
+   * @return true if AM resource usage is over the limit
+   */
+  public abstract boolean checkIfAMResourceUsageOverLimit(
+      Resource usage, Resource maxAMResource);
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java Sat Jun  7 16:29:10 2014
@@ -75,6 +75,11 @@ public class DominantResourceFairnessPol
   }
 
   @Override
+  public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+    return !Resources.fitsIn(usage, maxAMResource);
+  }
+
+  @Override
   public void initialize(Resource clusterCapacity) {
     comparator.setClusterCapacity(clusterCapacity);
   }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Sat Jun  7 16:29:10 2014
@@ -125,6 +125,11 @@ public class FairSharePolicy extends Sch
   }
 
   @Override
+  public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+    return usage.getMemory() > maxAMResource.getMemory();
+  }
+
+  @Override
   public byte getApplicableDepth() {
     return SchedulingPolicy.DEPTH_ANY;
   }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java Sat Jun  7 16:29:10 2014
@@ -95,6 +95,11 @@ public class FifoPolicy extends Scheduli
   }
 
   @Override
+  public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+    return usage.getMemory() > maxAMResource.getMemory();
+  }
+
+  @Override
   public byte getApplicableDepth() {
     return SchedulingPolicy.DEPTH_LEAF;
   }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Sat Jun  7 16:29:10 2014
@@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -111,7 +112,6 @@ public class FifoScheduler extends
 
   Configuration conf;
 
-  private boolean initialized;
   private boolean usePortForNodeName;
 
   private ActiveUsersManager activeUsersManager;
@@ -180,6 +180,47 @@ public class FifoScheduler extends
     }
   };
 
+  public FifoScheduler() {
+    super(FifoScheduler.class.getName());
+  }
+
+  private synchronized void initScheduler(Configuration conf) {
+    validateConf(conf);
+    //Use ConcurrentSkipListMap because applications need to be ordered
+    this.applications =
+        new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+    this.minimumAllocation =
+        Resources.createResource(conf.getInt(
+            YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+    this.maximumAllocation =
+        Resources.createResource(conf.getInt(
+            YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+    this.usePortForNodeName = conf.getBoolean(
+        YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
+    this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
+        conf);
+    this.activeUsersManager = new ActiveUsersManager(metrics);
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    initScheduler(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
   @Override
   public synchronized void setConf(Configuration conf) {
     this.conf = conf;
@@ -216,35 +257,17 @@ public class FifoScheduler extends
   }
 
   @Override
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+  }
+
+  @Override
   public synchronized void
       reinitialize(Configuration conf, RMContext rmContext) throws IOException
   {
     setConf(conf);
-    if (!this.initialized) {
-      validateConf(conf);
-      this.rmContext = rmContext;
-      //Use ConcurrentSkipListMap because applications need to be ordered
-      this.applications =
-          new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
-      this.minimumAllocation = 
-        Resources.createResource(conf.getInt(
-            YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
-      this.maximumAllocation = 
-        Resources.createResource(conf.getInt(
-            YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
-      this.usePortForNodeName = conf.getBoolean(
-          YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, 
-          YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
-      this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
-          conf);
-      this.activeUsersManager = new ActiveUsersManager(metrics);
-      this.initialized = true;
-    }
   }
 
-
   @Override
   public Allocation allocate(
       ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java Sat Jun  7 16:29:10 2014
@@ -25,6 +25,8 @@ import static org.apache.hadoop.yarn.web
 
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang.StringEscapeUtils;
@@ -35,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
@@ -60,7 +63,14 @@ public class FairSchedulerAppsBlock exte
     super(ctx);
     FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
     fsinfo = new FairSchedulerInfo(scheduler);
-    apps = rmContext.getRMApps();
+    apps = new ConcurrentHashMap<ApplicationId, RMApp>();
+    for (Map.Entry<ApplicationId, RMApp> entry : rmContext.getRMApps().entrySet()) {
+      if (!(RMAppState.NEW.equals(entry.getValue().getState())
+          || RMAppState.NEW_SAVING.equals(entry.getValue().getState())
+          || RMAppState.SUBMITTED.equals(entry.getValue().getState()))) {
+        apps.put(entry.getKey(), entry.getValue());
+      }
+    }
     this.conf = conf;
   }
   

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Sat Jun  7 16:29:10 2014
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -32,7 +31,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -105,14 +104,14 @@ public class MockNM {
   }
 
   public RegisterNodeManagerResponse registerNode(
-      List<ContainerStatus> containerStatus) throws Exception{
+      List<NMContainerStatus> containerReports) throws Exception{
     RegisterNodeManagerRequest req = Records.newRecord(
         RegisterNodeManagerRequest.class);
     req.setNodeId(nodeId);
     req.setHttpPort(httpPort);
     Resource resource = BuilderUtils.newResource(memory, vCores);
     req.setResource(resource);
-    req.setContainerStatuses(containerStatus);
+    req.setContainerStatuses(containerReports);
     req.setNMVersion(version);
     RegisterNodeManagerResponse registrationResponse =
         resourceTracker.registerNodeManager(req);
@@ -185,4 +184,11 @@ public class MockNM {
     return heartbeatResponse;
   }
 
+  public int getMemory() {
+    return memory;
+  }
+
+  public int getvCores() {
+    return vCores;
+  }
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Sat Jun  7 16:29:10 2014
@@ -646,7 +646,8 @@ public class TestClientRMService {
     ApplicationId[] appIds =
         {getApplicationId(101), getApplicationId(102), getApplicationId(103)};
     List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3");
-
+    
+    long[] submitTimeMillis = new long[3];
     // Submit applications
     for (int i = 0; i < appIds.length; i++) {
       ApplicationId appId = appIds[i];
@@ -656,6 +657,7 @@ public class TestClientRMService {
           appId, appNames[i], queues[i % queues.length],
           new HashSet<String>(tags.subList(0, i + 1)));
       rmService.submitApplication(submitRequest);
+      submitTimeMillis[i] = System.currentTimeMillis();
     }
 
     // Test different cases of ClientRMService#getApplications()
@@ -667,6 +669,24 @@ public class TestClientRMService {
     request.setLimit(1L);
     assertEquals("Failed to limit applications", 1,
         rmService.getApplications(request).getApplicationList().size());
+    
+    // Check start range
+    request = GetApplicationsRequest.newInstance();
+    request.setStartRange(submitTimeMillis[0], System.currentTimeMillis());
+    
+    // 2 applications are submitted after first timeMills
+    assertEquals("Incorrect number of matching start range", 
+        2, rmService.getApplications(request).getApplicationList().size());
+    
+    // 1 application is submitted after the second timeMills
+    request.setStartRange(submitTimeMillis[1], System.currentTimeMillis());
+    assertEquals("Incorrect number of matching start range", 
+        1, rmService.getApplications(request).getApplicationList().size());
+    
+    // no application is submitted after the third timeMills
+    request.setStartRange(submitTimeMillis[2], System.currentTimeMillis());
+    assertEquals("Incorrect number of matching start range", 
+        0, rmService.getApplications(request).getApplicationList().size());
 
     // Check queue
     request = GetApplicationsRequest.newInstance();

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Sat Jun  7 16:29:10 2014
@@ -77,12 +77,12 @@ public class TestFifoScheduler {
 
   @Test (timeout = 30000)
   public void testConfValidation() throws Exception {
-    ResourceScheduler scheduler = new FifoScheduler();
+    FifoScheduler scheduler = new FifoScheduler();
     Configuration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
     try {
-      scheduler.reinitialize(conf, null);
+      scheduler.serviceInit(conf);
       fail("Exception is expected because the min memory allocation is" +
         " larger than the max memory allocation.");
     } catch (YarnRuntimeException e) {
@@ -218,6 +218,9 @@ public class TestFifoScheduler {
   public void testNodeUpdateBeforeAppAttemptInit() throws Exception {
     FifoScheduler scheduler = new FifoScheduler();
     MockRM rm = new MockRM(conf);
+    scheduler.setRMContext(rm.getRMContext());
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, rm.getRMContext());
 
     RMNode node = MockNodes.newNodeInfo(1,
@@ -293,6 +296,8 @@ public class TestFifoScheduler {
     conf.setQueues("default", new String[] {"default"});
     conf.setCapacity("default", 100);
     FifoScheduler fs = new FifoScheduler();
+    fs.init(conf);
+    fs.start();
     fs.reinitialize(conf, null);
 
     RMNode n1 =
@@ -313,6 +318,7 @@ public class TestFifoScheduler {
     fs.handle(new NodeUpdateSchedulerEvent(n1));
 
     Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
+    fs.stop();
   }
   
   @Test (timeout = 50000)

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Sat Jun  7 16:29:10 2014
@@ -30,6 +30,7 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -67,13 +68,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -303,13 +305,11 @@ public class TestRMRestart {
     nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
     nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
 
-    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
-    ContainerStatus containerStatus =
-        BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
-            .getCurrentAppAttempt().getAppAttemptId(), 1),
-            ContainerState.COMPLETE, "Killed AM container", 143);
-    containerStatuses.add(containerStatus);
-    nm1.registerNode(containerStatuses);
+    NMContainerStatus status =
+        TestRMRestart
+          .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
+            .getAppAttemptId(), 1, ContainerState.COMPLETE);
+    nm1.registerNode(Arrays.asList(status));
     nm2.registerNode();
     
     rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
@@ -510,14 +510,11 @@ public class TestRMRestart {
     Assert.assertEquals(RMAppAttemptState.LAUNCHED,
         rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
             .getAppAttemptState());
-    
-    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
-    ContainerStatus containerStatus =
-        BuilderUtils.newContainerStatus(
-            BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1),
-            ContainerState.COMPLETE, "Killed AM container", 143);
-    containerStatuses.add(containerStatus);
-    nm1.registerNode(containerStatuses);
+
+    NMContainerStatus status =
+        TestRMRestart.createNMContainerStatus(
+          am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    nm1.registerNode(Arrays.asList(status));
     rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
     launchAM(rmApp, rm2, nm1);
     Assert.assertEquals(3, rmApp.getAppAttempts().size());
@@ -1678,13 +1675,12 @@ public class TestRMRestart {
     am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
     nm1.nodeHeartbeat(true);
     nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
-    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
-    ContainerStatus containerStatus =
-        BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
-            .getCurrentAppAttempt().getAppAttemptId(), 1),
-            ContainerState.COMPLETE, "Killed AM container", 143);
-    containerStatuses.add(containerStatus);
-    nm1.registerNode(containerStatuses);
+
+    NMContainerStatus status =
+        TestRMRestart
+          .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
+            .getAppAttemptId(), 1, ContainerState.COMPLETE);
+    nm1.registerNode(Arrays.asList(status));
     while (loadedApp1.getAppAttempts().size() != 2) {
       Thread.sleep(200);
     }
@@ -1808,12 +1804,10 @@ public class TestRMRestart {
             // ResourceTrackerService is started.
             super.serviceStart();
             nm1.setResourceTrackerService(getResourceTrackerService());
-            List<ContainerStatus> status = new ArrayList<ContainerStatus>();
-            ContainerId amContainer =
-                ContainerId.newInstance(am0.getApplicationAttemptId(), 1);
-            status.add(ContainerStatus.newInstance(amContainer,
-              ContainerState.COMPLETE, "AM container exit", 143));
-            nm1.registerNode(status);
+            NMContainerStatus status =
+                TestRMRestart.createNMContainerStatus(
+                  am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+            nm1.registerNode(Arrays.asList(status));
           }
         };
       }
@@ -1852,6 +1846,15 @@ public class TestRMRestart {
     }
   }
 
+  public static NMContainerStatus createNMContainerStatus(
+      ApplicationAttemptId appAttemptId, int id, ContainerState containerState) {
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, id);
+    NMContainerStatus containerReport =
+        NMContainerStatus.newInstance(containerId, containerState,
+          Resource.newInstance(1024, 1), "recover container", 0);
+    return containerReport;
+  }
+
   public class TestMemoryRMStateStore extends MemoryRMStateStore {
     int count = 0;
     public int updateApp = 0;

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java Sat Jun  7 16:29:10 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
@@ -487,33 +488,37 @@ public class TestResourceTrackerService 
     RMApp app = rm.submitApp(1024, true);
 
     // Case 1.1: AppAttemptId is null
-    ContainerStatus status = ContainerStatus.newInstance(
-        ContainerId.newInstance(ApplicationAttemptId.newInstance(
-            app.getApplicationId(), 2), 1),
-        ContainerState.COMPLETE, "Dummy Completed", 0);
-    rm.getResourceTrackerService().handleContainerStatus(status);
+    NMContainerStatus report =
+        NMContainerStatus.newInstance(
+          ContainerId.newInstance(
+            ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
+          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+          "Dummy Completed", 0);
+    rm.getResourceTrackerService().handleNMContainerStatus(report);
     verify(handler, never()).handle((Event) any());
 
     // Case 1.2: Master container is null
     RMAppAttemptImpl currentAttempt =
         (RMAppAttemptImpl) app.getCurrentAppAttempt();
     currentAttempt.setMasterContainer(null);
-    status = ContainerStatus.newInstance(
-        ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
-        ContainerState.COMPLETE, "Dummy Completed", 0);
-    rm.getResourceTrackerService().handleContainerStatus(status);
+    report = NMContainerStatus.newInstance(
+          ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
+          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+          "Dummy Completed", 0);
+    rm.getResourceTrackerService().handleNMContainerStatus(report);
     verify(handler, never()).handle((Event)any());
 
     // Case 2: Managed AM
     app = rm.submitApp(1024);
 
     // Case 2.1: AppAttemptId is null
-    status = ContainerStatus.newInstance(
-        ContainerId.newInstance(ApplicationAttemptId.newInstance(
-            app.getApplicationId(), 2), 1),
-        ContainerState.COMPLETE, "Dummy Completed", 0);
+    report = NMContainerStatus.newInstance(
+          ContainerId.newInstance(
+            ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
+          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+          "Dummy Completed", 0);
     try {
-      rm.getResourceTrackerService().handleContainerStatus(status);
+      rm.getResourceTrackerService().handleNMContainerStatus(report);
     } catch (Exception e) {
       // expected - ignore
     }
@@ -523,11 +528,12 @@ public class TestResourceTrackerService 
     currentAttempt =
         (RMAppAttemptImpl) app.getCurrentAppAttempt();
     currentAttempt.setMasterContainer(null);
-    status = ContainerStatus.newInstance(
-        ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
-        ContainerState.COMPLETE, "Dummy Completed", 0);
+    report = NMContainerStatus.newInstance(
+      ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
+      ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+      "Dummy Completed", 0);
     try {
-      rm.getResourceTrackerService().handleContainerStatus(status);
+      rm.getResourceTrackerService().handleNMContainerStatus(report);
     } catch (Exception e) {
       // expected - ignore
     }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java Sat Jun  7 16:29:10 2014
@@ -41,6 +41,7 @@ import java.security.NoSuchAlgorithmExce
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -203,7 +204,7 @@ public class TestZKRMStateStoreZKClientC
       LOG.error(error, e);
       fail(error);
     }
-    Assert.assertEquals("newBytes", new String(ret));
+    assertEquals("newBytes", new String(ret));
   }
 
   @Test(timeout = 20000)
@@ -232,7 +233,7 @@ public class TestZKRMStateStoreZKClientC
 
     try {
       byte[] ret = store.getDataWithRetries(path, false);
-      Assert.assertEquals("bytes", new String(ret));
+      assertEquals("bytes", new String(ret));
     } catch (Exception e) {
       String error = "New session creation failed";
       LOG.error(error, e);
@@ -281,4 +282,24 @@ public class TestZKRMStateStoreZKClientC
 
     zkClientTester.getRMStateStore(conf);
   }
+
+  @Test
+  public void testZKRetryInterval() throws Exception {
+    TestZKClient zkClientTester = new TestZKClient();
+    YarnConfiguration conf = new YarnConfiguration();
+
+    ZKRMStateStore store =
+        (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+    assertEquals(YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS,
+        store.zkRetryInterval);
+    store.stop();
+
+    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+    store =
+        (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+    assertEquals(YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS /
+            YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES,
+        store.zkRetryInterval);
+    store.stop();
+  }
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Sat Jun  7 16:29:10 2014
@@ -121,13 +121,16 @@ public class TestCapacityScheduler {
 
   @After
   public void tearDown() throws Exception {
-    resourceManager.stop();
+    if (resourceManager != null) {
+      resourceManager.stop();
+    }
   }
 
 
   @Test (timeout = 30000)
   public void testConfValidation() throws Exception {
     ResourceScheduler scheduler = new CapacityScheduler();
+    scheduler.setRMContext(resourceManager.getRMContext());
     Configuration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
@@ -342,18 +345,23 @@ public class TestCapacityScheduler {
   public void testRefreshQueues() throws Exception {
     CapacityScheduler cs = new CapacityScheduler();
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    RMContextImpl rmContext =  new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
     setupQueueConfiguration(conf);
     cs.setConf(new YarnConfiguration());
-    cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
-      null, new RMContainerTokenSecretManager(conf),
-      new NMTokenSecretManagerInRM(conf),
-      new ClientToAMTokenSecretManagerInRM(), null));
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, rmContext);
     checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
 
     conf.setCapacity(A, 80f);
     conf.setCapacity(B, 20f);
     cs.reinitialize(conf, mockContext);
     checkQueueCapacities(cs, 80f, 20f);
+    cs.stop();
   }
 
   private void checkQueueCapacities(CapacityScheduler cs,
@@ -456,6 +464,9 @@ public class TestCapacityScheduler {
     setupQueueConfiguration(csConf);
     CapacityScheduler cs = new CapacityScheduler();
     cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(csConf);
+    cs.start();
     cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
       null, null, new RMContainerTokenSecretManager(csConf),
       new NMTokenSecretManagerInRM(csConf),
@@ -475,6 +486,7 @@ public class TestCapacityScheduler {
     cs.handle(new NodeAddedSchedulerEvent(n1));
 
     Assert.assertEquals(4 * GB, cs.getClusterResource().getMemory());
+    cs.stop();
   }
 
   @Test
@@ -483,6 +495,9 @@ public class TestCapacityScheduler {
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
     setupQueueConfiguration(conf);
     cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
     cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
       null, new RMContainerTokenSecretManager(conf),
       new NMTokenSecretManagerInRM(conf),
@@ -513,6 +528,7 @@ public class TestCapacityScheduler {
       assertEquals(queueB, queueB4.getParent());
     } finally {
       B3_CAPACITY += B4_CAPACITY;
+      cs.stop();
     }
   }
   @Test

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Sat Jun  7 16:29:10 2014
@@ -114,7 +114,7 @@ public class TestLeafQueue {
     setupQueueConfiguration(csConf, newRoot);
     YarnConfiguration conf = new YarnConfiguration();
     cs.setConf(conf);
-    
+
     csContext = mock(CapacitySchedulerContext.class);
     when(csContext.getConfiguration()).thenReturn(csConf);
     when(csContext.getConf()).thenReturn(conf);
@@ -142,7 +142,9 @@ public class TestLeafQueue {
             queues, queues, 
             TestUtils.spyHook);
 
-    cs.reinitialize(csConf, rmContext);
+    cs.setRMContext(rmContext);
+    cs.init(csConf);
+    cs.start();
   }
   
   private static final String A = "a";
@@ -2080,5 +2082,8 @@ public class TestLeafQueue {
 
   @After
   public void tearDown() throws Exception {
+    if (cs != null) {
+      cs.stop();
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java Sat Jun  7 16:29:10 2014
@@ -43,11 +43,15 @@ public class TestQueueParsing {
     YarnConfiguration conf = new YarnConfiguration(csConf);
 
     CapacityScheduler capacityScheduler = new CapacityScheduler();
+    RMContextImpl rmContext = new RMContextImpl(null, null,
+        null, null, null, null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
     capacityScheduler.setConf(conf);
-    capacityScheduler.reinitialize(conf, new RMContextImpl(null, null,
-      null, null, null, null, new RMContainerTokenSecretManager(conf),
-      new NMTokenSecretManagerInRM(conf),
-      new ClientToAMTokenSecretManagerInRM(), null));
+    capacityScheduler.setRMContext(rmContext);
+    capacityScheduler.init(conf);
+    capacityScheduler.start();
+    capacityScheduler.reinitialize(conf, rmContext);
     
     CSQueue a = capacityScheduler.getQueue("a");
     Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
@@ -62,6 +66,7 @@ public class TestQueueParsing {
     Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
     Assert.assertEquals(0.7 * 0.55 * 0.7, 
         c12.getAbsoluteMaximumCapacity(), DELTA);
+    capacityScheduler.stop();
   }
   
   private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
@@ -142,7 +147,10 @@ public class TestQueueParsing {
 
     CapacityScheduler capacityScheduler = new CapacityScheduler();
     capacityScheduler.setConf(new YarnConfiguration());
+    capacityScheduler.init(conf);
+    capacityScheduler.start();
     capacityScheduler.reinitialize(conf, null);
+    capacityScheduler.stop();
   }
   
   public void testMaxCapacity() throws Exception {
@@ -164,6 +172,8 @@ public class TestQueueParsing {
     try {
       capacityScheduler = new CapacityScheduler();
       capacityScheduler.setConf(new YarnConfiguration());
+      capacityScheduler.init(conf);
+      capacityScheduler.start();
       capacityScheduler.reinitialize(conf, null);
     } catch (IllegalArgumentException iae) {
       fail = true;
@@ -176,6 +186,8 @@ public class TestQueueParsing {
     // Now this should work
     capacityScheduler = new CapacityScheduler();
     capacityScheduler.setConf(new YarnConfiguration());
+    capacityScheduler.init(conf);
+    capacityScheduler.start();
     capacityScheduler.reinitialize(conf, null);
     
     fail = false;
@@ -187,6 +199,7 @@ public class TestQueueParsing {
     }
     Assert.assertTrue("Didn't throw IllegalArgumentException for wrong " +
     		"setMaxCap", fail);
+    capacityScheduler.stop();
   }
   
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java Sat Jun  7 16:29:10 2014
@@ -20,14 +20,21 @@ package org.apache.hadoop.yarn.server.re
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Clock;
 
@@ -169,4 +176,20 @@ public class FairSchedulerTestBase {
     ask.add(request);
     scheduler.allocate(attId, ask,  new ArrayList<ContainerId>(), null, null);
   }
+
+  protected void createApplicationWithAMResource(ApplicationAttemptId attId,
+      String queue, String user, Resource amResource) {
+    RMContext rmContext = resourceManager.getRMContext();
+    RMApp rmApp = new RMAppImpl(attId.getApplicationId(), rmContext, conf,
+        null, null, null, ApplicationSubmissionContext.newInstance(null, null,
+        null, null, null, false, false, 0, amResource, null), null, null,
+        0, null, null);
+    rmContext.getRMApps().put(attId.getApplicationId(), rmApp);
+    AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
+        attId.getApplicationId(), queue, user);
+    scheduler.handle(appAddedEvent);
+    AppAttemptAddedSchedulerEvent attempAddedEvent =
+        new AppAttemptAddedSchedulerEvent(attId, false);
+    scheduler.handle(attempAddedEvent);
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java Sat Jun  7 16:29:10 2014
@@ -174,9 +174,10 @@ public class TestAllocationFileLoaderSer
     out.println("<queue name=\"queueC\">");
     out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
     out.println("</queue>");
-    // Give queue D a limit of 3 running apps
+    // Give queue D a limit of 3 running apps and 0.4f maxAMShare
     out.println("<queue name=\"queueD\">");
     out.println("<maxRunningApps>3</maxRunningApps>");
+    out.println("<maxAMShare>0.4</maxAMShare>");
     out.println("</queue>");
     // Give queue E a preemption timeout of one minute
     out.println("<queue name=\"queueE\">");
@@ -194,6 +195,8 @@ public class TestAllocationFileLoaderSer
     out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
     // Set default limit of apps per user to 5
     out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
+    // Set default limit of AMResourceShare to 0.5f
+    out.println("<queueMaxAMShareDefault>0.5f</queueMaxAMShareDefault>");
     // Give user1 a limit of 10 jobs
     out.println("<user name=\"user1\">");
     out.println("<maxRunningApps>10</maxRunningApps>");
@@ -240,6 +243,13 @@ public class TestAllocationFileLoaderSer
     assertEquals(10, queueConf.getUserMaxApps("user1"));
     assertEquals(5, queueConf.getUserMaxApps("user2"));
 
+    assertEquals(.5f, queueConf.getQueueMaxAMShare("root." + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
+    assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueA"), 0.01);
+    assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueB"), 0.01);
+    assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueC"), 0.01);
+    assertEquals(.4f, queueConf.getQueueMaxAMShare("root.queueD"), 0.01);
+    assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueE"), 0.01);
+
     // Root should get * ACL
     assertEquals("*", queueConf.getQueueAcl("root",
         QueueACL.ADMINISTER_QUEUE).getAclString());

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java Sat Jun  7 16:29:10 2014
@@ -48,6 +48,8 @@ public class TestFSLeafQueue {
     ResourceManager resourceManager = new ResourceManager();
     resourceManager.init(conf);
     ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     
     String queueName = "root.queue1";