You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ka...@apache.org on 2014/06/01 21:11:24 UTC
svn commit: r1599025 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn...
Author: kasha
Date: Sun Jun 1 19:11:23 2014
New Revision: 1599025
URL: http://svn.apache.org/r1599025
Log:
YARN-1474. Make sechedulers services. (Tsuyoshi Ozawa via kasha)
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sun Jun 1 19:11:23 2014
@@ -115,6 +115,8 @@ Release 2.5.0 - UNRELEASED
NMContainerStatus which has more information that is needed for
work-preserving RM-restart. (Jian He via vinodkv)
+ YARN-1474. Make sechedulers services. (Tsuyoshi Ozawa via kasha)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Sun Jun 1 19:11:23 2014
@@ -401,6 +401,8 @@ public class ResourceManager extends Com
// Initialize the scheduler
scheduler = createScheduler();
+ scheduler.setRMContext(rmContext);
+ addIfService(scheduler);
rmContext.setScheduler(scheduler);
schedulerDispatcher = createSchedulerEventDispatcher();
@@ -429,12 +431,6 @@ public class ResourceManager extends Com
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
- try {
- scheduler.reinitialize(conf, rmContext);
- } catch (IOException ioe) {
- throw new RuntimeException("Failed to initialize scheduler", ioe);
- }
-
// creating monitors that handle preemption
createPolicyMonitors();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Sun Jun 1 19:11:23 2014
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHa
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -41,7 +42,7 @@ import org.apache.hadoop.yarn.util.resou
public abstract class AbstractYarnScheduler
<T extends SchedulerApplicationAttempt, N extends SchedulerNode>
- implements ResourceScheduler {
+ extends AbstractService implements ResourceScheduler {
private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
@@ -62,6 +63,15 @@ public abstract class AbstractYarnSchedu
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
+ /**
+ * Construct the service.
+ *
+ * @param name service name
+ */
+ public AbstractYarnScheduler(String name) {
+ super(name);
+ }
+
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Sun Jun 1 19:11:23 2014
@@ -34,6 +34,15 @@ import org.apache.hadoop.yarn.server.res
@LimitedPrivate("yarn")
@Evolving
public interface ResourceScheduler extends YarnScheduler, Recoverable {
+
+ /**
+ * Set RMContext for <code>ResourceScheduler</code>.
+ * This method should be called immediately after instantiating
+ * a scheduler once.
+ * @param rmContext created by ResourceManager
+ */
+ void setRMContext(RMContext rmContext);
+
/**
* Re-initialize the <code>ResourceScheduler</code>.
* @param conf configuration
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Sun Jun 1 19:11:23 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -102,6 +103,8 @@ public class CapacityScheduler extends
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
private CSQueue root;
+ // timeout to join when we stop this service
+ protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
@Override
@@ -179,8 +182,6 @@ public class CapacityScheduler extends
private int numNodeManagers = 0;
- private boolean initialized = false;
-
private ResourceCalculator calculator;
private boolean usePortForNodeName;
@@ -196,7 +197,9 @@ public class CapacityScheduler extends
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
- public CapacityScheduler() {}
+ public CapacityScheduler() {
+ super(CapacityScheduler.class.getName());
+ }
@Override
public QueueMetrics getRootQueueMetrics() {
@@ -238,56 +241,91 @@ public class CapacityScheduler extends
}
@Override
- public RMContext getRMContext() {
+ public synchronized RMContext getRMContext() {
return this.rmContext;
}
-
+
@Override
- public synchronized void
- reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+ public synchronized void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ private synchronized void initScheduler(Configuration configuration) throws
+ IOException {
+ this.conf = loadCapacitySchedulerConfiguration(configuration);
+ validateConf(this.conf);
+ this.minimumAllocation = this.conf.getMinimumAllocation();
+ this.maximumAllocation = this.conf.getMaximumAllocation();
+ this.calculator = this.conf.getResourceCalculator();
+ this.usePortForNodeName = this.conf.getUsePortForNodeName();
+ this.applications =
+ new ConcurrentHashMap<ApplicationId,
+ SchedulerApplication<FiCaSchedulerApp>>();
+
+ initializeQueues(this.conf);
+
+ scheduleAsynchronously = this.conf.getScheduleAynschronously();
+ asyncScheduleInterval =
+ this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
+ DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+ if (scheduleAsynchronously) {
+ asyncSchedulerThread = new AsyncScheduleThread(this);
+ }
+
+ LOG.info("Initialized CapacityScheduler with " +
+ "calculator=" + getResourceCalculator().getClass() + ", " +
+ "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
+ "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
+ "asynchronousScheduling=" + scheduleAsynchronously + ", " +
+ "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
+ }
+
+ private synchronized void startSchedulerThreads() {
+ if (scheduleAsynchronously) {
+ Preconditions.checkNotNull(asyncSchedulerThread,
+ "asyncSchedulerThread is null");
+ asyncSchedulerThread.start();
+ }
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
Configuration configuration = new Configuration(conf);
- if (!initialized) {
- this.rmContext = rmContext;
- this.conf = loadCapacitySchedulerConfiguration(configuration);
- validateConf(this.conf);
- this.minimumAllocation = this.conf.getMinimumAllocation();
- this.maximumAllocation = this.conf.getMaximumAllocation();
- this.calculator = this.conf.getResourceCalculator();
- this.usePortForNodeName = this.conf.getUsePortForNodeName();
- this.applications =
- new ConcurrentHashMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+ initScheduler(configuration);
+ super.serviceInit(conf);
+ }
- initializeQueues(this.conf);
-
- scheduleAsynchronously = this.conf.getScheduleAynschronously();
- asyncScheduleInterval =
- this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
- DEFAULT_ASYNC_SCHEDULER_INTERVAL);
- if (scheduleAsynchronously) {
- asyncSchedulerThread = new AsyncScheduleThread(this);
- asyncSchedulerThread.start();
- }
-
- initialized = true;
- LOG.info("Initialized CapacityScheduler with " +
- "calculator=" + getResourceCalculator().getClass() + ", " +
- "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
- "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
- "asynchronousScheduling=" + scheduleAsynchronously + ", " +
- "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
-
- } else {
- CapacitySchedulerConfiguration oldConf = this.conf;
- this.conf = loadCapacitySchedulerConfiguration(configuration);
- validateConf(this.conf);
- try {
- LOG.info("Re-initializing queues...");
- reinitializeQueues(this.conf);
- } catch (Throwable t) {
- this.conf = oldConf;
- throw new IOException("Failed to re-init queues", t);
+ @Override
+ public void serviceStart() throws Exception {
+ startSchedulerThreads();
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ synchronized (this) {
+ if (scheduleAsynchronously && asyncSchedulerThread != null) {
+ asyncSchedulerThread.interrupt();
+ asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS);
}
}
+ super.serviceStop();
+ }
+
+ @Override
+ public synchronized void
+ reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+ Configuration configuration = new Configuration(conf);
+ CapacitySchedulerConfiguration oldConf = this.conf;
+ this.conf = loadCapacitySchedulerConfiguration(configuration);
+ validateConf(this.conf);
+ try {
+ LOG.info("Re-initializing queues...");
+ reinitializeQueues(this.conf);
+ } catch (Throwable t) {
+ this.conf = oldConf;
+ throw new IOException("Failed to re-init queues", t);
+ }
}
long getAsyncScheduleInterval() {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java Sun Jun 1 19:11:23 2014
@@ -68,7 +68,9 @@ public class AllocationFileLoaderService
* (this is done to prevent loading a file that hasn't been fully written).
*/
public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
-
+
+ public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
private final Clock clock;
private long lastSuccessfulReload; // Last time we successfully reloaded queues
@@ -146,7 +148,14 @@ public class AllocationFileLoaderService
@Override
public void stop() {
running = false;
- reloadThread.interrupt();
+ if (reloadThread != null) {
+ reloadThread.interrupt();
+ try {
+ reloadThread.join(THREAD_JOIN_TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ LOG.warn("reloadThread fails to join.");
+ }
+ }
super.stop();
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Sun Jun 1 19:11:23 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -117,7 +118,6 @@ import com.google.common.annotations.Vis
@SuppressWarnings("unchecked")
public class FairScheduler extends
AbstractYarnScheduler<FSSchedulerApp, FSSchedulerNode> {
- private boolean initialized;
private FairSchedulerConfiguration conf;
private Resource incrAllocation;
@@ -137,6 +137,11 @@ public class FairScheduler extends
// How often fair shares are re-calculated (ms)
protected long UPDATE_INTERVAL = 500;
+ private Thread updateThread;
+ private Thread schedulingThread;
+ // timeout to join when we stop this service
+ protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
// Aggregate metrics
FSQueueMetrics rootMetrics;
@@ -182,6 +187,7 @@ public class FairScheduler extends
AllocationConfiguration allocConf;
public FairScheduler() {
+ super(FairScheduler.class.getName());
clock = new SystemClock();
allocsLoader = new AllocationFileLoaderService();
queueMgr = new QueueManager(this);
@@ -473,7 +479,8 @@ public class FairScheduler extends
return resToPreempt;
}
- public RMContainerTokenSecretManager getContainerTokenSecretManager() {
+ public synchronized RMContainerTokenSecretManager
+ getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
}
@@ -1154,87 +1161,130 @@ public class FairScheduler extends
// NOT IMPLEMENTED
}
- @Override
- public synchronized void reinitialize(Configuration conf, RMContext rmContext)
- throws IOException {
- if (!initialized) {
- this.conf = new FairSchedulerConfiguration(conf);
- validateConf(this.conf);
- minimumAllocation = this.conf.getMinimumAllocation();
- maximumAllocation = this.conf.getMaximumAllocation();
- incrAllocation = this.conf.getIncrementAllocation();
- continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
- continuousSchedulingSleepMs =
- this.conf.getContinuousSchedulingSleepMs();
- nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
- rackLocalityThreshold = this.conf.getLocalityThresholdRack();
- nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
- rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
- preemptionEnabled = this.conf.getPreemptionEnabled();
- preemptionUtilizationThreshold =
- this.conf.getPreemptionUtilizationThreshold();
- assignMultiple = this.conf.getAssignMultiple();
- maxAssign = this.conf.getMaxAssign();
- sizeBasedWeight = this.conf.getSizeBasedWeight();
- preemptionInterval = this.conf.getPreemptionInterval();
- waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
- usePortForNodeName = this.conf.getUsePortForNodeName();
-
- rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
- this.rmContext = rmContext;
- // This stores per-application scheduling information
- this.applications =
- new ConcurrentHashMap<ApplicationId, SchedulerApplication<FSSchedulerApp>>();
- this.eventLog = new FairSchedulerEventLog();
- eventLog.init(this.conf);
+ public synchronized void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
- initialized = true;
+ private synchronized void initScheduler(Configuration conf)
+ throws IOException {
+ this.conf = new FairSchedulerConfiguration(conf);
+ validateConf(this.conf);
+ minimumAllocation = this.conf.getMinimumAllocation();
+ maximumAllocation = this.conf.getMaximumAllocation();
+ incrAllocation = this.conf.getIncrementAllocation();
+ continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+ continuousSchedulingSleepMs =
+ this.conf.getContinuousSchedulingSleepMs();
+ nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
+ rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+ nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+ rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
+ preemptionEnabled = this.conf.getPreemptionEnabled();
+ preemptionUtilizationThreshold =
+ this.conf.getPreemptionUtilizationThreshold();
+ assignMultiple = this.conf.getAssignMultiple();
+ maxAssign = this.conf.getMaxAssign();
+ sizeBasedWeight = this.conf.getSizeBasedWeight();
+ preemptionInterval = this.conf.getPreemptionInterval();
+ waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+ usePortForNodeName = this.conf.getUsePortForNodeName();
+
+ rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
+ // This stores per-application scheduling information
+ this.applications =
+ new ConcurrentHashMap<ApplicationId,SchedulerApplication<FSSchedulerApp>>();
+ this.eventLog = new FairSchedulerEventLog();
+ eventLog.init(this.conf);
- allocConf = new AllocationConfiguration(conf);
- try {
- queueMgr.initialize(conf);
- } catch (Exception e) {
- throw new IOException("Failed to start FairScheduler", e);
- }
+ allocConf = new AllocationConfiguration(conf);
+ try {
+ queueMgr.initialize(conf);
+ } catch (Exception e) {
+ throw new IOException("Failed to start FairScheduler", e);
+ }
- Thread updateThread = new Thread(new UpdateThread());
- updateThread.setName("FairSchedulerUpdateThread");
- updateThread.setDaemon(true);
- updateThread.start();
+ updateThread = new Thread(new UpdateThread());
+ updateThread.setName("FairSchedulerUpdateThread");
+ updateThread.setDaemon(true);
- if (continuousSchedulingEnabled) {
- // start continuous scheduling thread
- Thread schedulingThread = new Thread(
+ if (continuousSchedulingEnabled) {
+ // start continuous scheduling thread
+ schedulingThread = new Thread(
new Runnable() {
@Override
public void run() {
continuousScheduling();
}
}
- );
- schedulingThread.setName("ContinuousScheduling");
- schedulingThread.setDaemon(true);
- schedulingThread.start();
+ );
+ schedulingThread.setName("ContinuousScheduling");
+ schedulingThread.setDaemon(true);
+ }
+
+ allocsLoader.init(conf);
+ allocsLoader.setReloadListener(new AllocationReloadListener());
+ // If we fail to load allocations file on initialize, we want to fail
+ // immediately. After a successful load, exceptions on future reloads
+ // will just result in leaving things as they are.
+ try {
+ allocsLoader.reloadAllocations();
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize FairScheduler", e);
+ }
+ }
+
+ private synchronized void startSchedulerThreads() {
+ Preconditions.checkNotNull(updateThread, "updateThread is null");
+ Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
+ updateThread.start();
+ if (continuousSchedulingEnabled) {
+ Preconditions.checkNotNull(schedulingThread, "schedulingThread is null");
+ schedulingThread.start();
+ }
+ allocsLoader.start();
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ initScheduler(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ startSchedulerThreads();
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ synchronized (this) {
+ if (updateThread != null) {
+ updateThread.interrupt();
+ updateThread.join(THREAD_JOIN_TIMEOUT_MS);
}
-
- allocsLoader.init(conf);
- allocsLoader.setReloadListener(new AllocationReloadListener());
- // If we fail to load allocations file on initialize, we want to fail
- // immediately. After a successful load, exceptions on future reloads
- // will just result in leaving things as they are.
- try {
- allocsLoader.reloadAllocations();
- } catch (Exception e) {
- throw new IOException("Failed to initialize FairScheduler", e);
+ if (continuousSchedulingEnabled) {
+ if (schedulingThread != null) {
+ schedulingThread.interrupt();
+ schedulingThread.join(THREAD_JOIN_TIMEOUT_MS);
+ }
}
- allocsLoader.start();
- } else {
- try {
- allocsLoader.reloadAllocations();
- } catch (Exception e) {
- LOG.error("Failed to reload allocations file", e);
+ if (allocsLoader != null) {
+ allocsLoader.stop();
}
}
+
+ super.serviceStop();
+ }
+
+ @Override
+ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+ throws IOException {
+ try {
+ allocsLoader.reloadAllocations();
+ } catch (Exception e) {
+ LOG.error("Failed to reload allocations file", e);
+ }
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Sun Jun 1 19:11:23 2014
@@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -111,7 +112,6 @@ public class FifoScheduler extends
Configuration conf;
- private boolean initialized;
private boolean usePortForNodeName;
private ActiveUsersManager activeUsersManager;
@@ -180,6 +180,47 @@ public class FifoScheduler extends
}
};
+ public FifoScheduler() {
+ super(FifoScheduler.class.getName());
+ }
+
+ private synchronized void initScheduler(Configuration conf) {
+ validateConf(conf);
+ //Use ConcurrentSkipListMap because applications need to be ordered
+ this.applications =
+ new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+ this.minimumAllocation =
+ Resources.createResource(conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+ this.maximumAllocation =
+ Resources.createResource(conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ this.usePortForNodeName = conf.getBoolean(
+ YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
+ this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
+ conf);
+ this.activeUsersManager = new ActiveUsersManager(metrics);
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ initScheduler(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
@Override
public synchronized void setConf(Configuration conf) {
this.conf = conf;
@@ -216,35 +257,17 @@ public class FifoScheduler extends
}
@Override
+ public synchronized void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ @Override
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException
{
setConf(conf);
- if (!this.initialized) {
- validateConf(conf);
- this.rmContext = rmContext;
- //Use ConcurrentSkipListMap because applications need to be ordered
- this.applications =
- new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
- this.minimumAllocation =
- Resources.createResource(conf.getInt(
- YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
- this.maximumAllocation =
- Resources.createResource(conf.getInt(
- YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
- this.usePortForNodeName = conf.getBoolean(
- YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
- this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
- conf);
- this.activeUsersManager = new ActiveUsersManager(metrics);
- this.initialized = true;
- }
}
-
@Override
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Sun Jun 1 19:11:23 2014
@@ -77,12 +77,12 @@ public class TestFifoScheduler {
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
- ResourceScheduler scheduler = new FifoScheduler();
+ FifoScheduler scheduler = new FifoScheduler();
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try {
- scheduler.reinitialize(conf, null);
+ scheduler.serviceInit(conf);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnRuntimeException e) {
@@ -218,6 +218,9 @@ public class TestFifoScheduler {
public void testNodeUpdateBeforeAppAttemptInit() throws Exception {
FifoScheduler scheduler = new FifoScheduler();
MockRM rm = new MockRM(conf);
+ scheduler.setRMContext(rm.getRMContext());
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, rm.getRMContext());
RMNode node = MockNodes.newNodeInfo(1,
@@ -293,6 +296,8 @@ public class TestFifoScheduler {
conf.setQueues("default", new String[] {"default"});
conf.setCapacity("default", 100);
FifoScheduler fs = new FifoScheduler();
+ fs.init(conf);
+ fs.start();
fs.reinitialize(conf, null);
RMNode n1 =
@@ -313,6 +318,7 @@ public class TestFifoScheduler {
fs.handle(new NodeUpdateSchedulerEvent(n1));
Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
+ fs.stop();
}
@Test (timeout = 50000)
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Sun Jun 1 19:11:23 2014
@@ -121,13 +121,16 @@ public class TestCapacityScheduler {
@After
public void tearDown() throws Exception {
- resourceManager.stop();
+ if (resourceManager != null) {
+ resourceManager.stop();
+ }
}
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
ResourceScheduler scheduler = new CapacityScheduler();
+ scheduler.setRMContext(resourceManager.getRMContext());
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
@@ -342,18 +345,23 @@ public class TestCapacityScheduler {
public void testRefreshQueues() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+ null, new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null);
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
- cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
- null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null));
+ cs.setRMContext(resourceManager.getRMContext());
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, rmContext);
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
conf.setCapacity(A, 80f);
conf.setCapacity(B, 20f);
cs.reinitialize(conf, mockContext);
checkQueueCapacities(cs, 80f, 20f);
+ cs.stop();
}
private void checkQueueCapacities(CapacityScheduler cs,
@@ -456,6 +464,9 @@ public class TestCapacityScheduler {
setupQueueConfiguration(csConf);
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ cs.init(csConf);
+ cs.start();
cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
null, null, new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
@@ -475,6 +486,7 @@ public class TestCapacityScheduler {
cs.handle(new NodeAddedSchedulerEvent(n1));
Assert.assertEquals(4 * GB, cs.getClusterResource().getMemory());
+ cs.stop();
}
@Test
@@ -483,6 +495,9 @@ public class TestCapacityScheduler {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ cs.init(conf);
+ cs.start();
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
@@ -513,6 +528,7 @@ public class TestCapacityScheduler {
assertEquals(queueB, queueB4.getParent());
} finally {
B3_CAPACITY += B4_CAPACITY;
+ cs.stop();
}
}
@Test
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Sun Jun 1 19:11:23 2014
@@ -114,7 +114,7 @@ public class TestLeafQueue {
setupQueueConfiguration(csConf, newRoot);
YarnConfiguration conf = new YarnConfiguration();
cs.setConf(conf);
-
+
csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(conf);
@@ -142,7 +142,9 @@ public class TestLeafQueue {
queues, queues,
TestUtils.spyHook);
- cs.reinitialize(csConf, rmContext);
+ cs.setRMContext(rmContext);
+ cs.init(csConf);
+ cs.start();
}
private static final String A = "a";
@@ -2080,5 +2082,8 @@ public class TestLeafQueue {
@After
public void tearDown() throws Exception {
+ if (cs != null) {
+ cs.stop();
+ }
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java Sun Jun 1 19:11:23 2014
@@ -43,11 +43,15 @@ public class TestQueueParsing {
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
+ RMContextImpl rmContext = new RMContextImpl(null, null,
+ null, null, null, null, new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null);
capacityScheduler.setConf(conf);
- capacityScheduler.reinitialize(conf, new RMContextImpl(null, null,
- null, null, null, null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null));
+ capacityScheduler.setRMContext(rmContext);
+ capacityScheduler.init(conf);
+ capacityScheduler.start();
+ capacityScheduler.reinitialize(conf, rmContext);
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
@@ -62,6 +66,7 @@ public class TestQueueParsing {
Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.7 * 0.55 * 0.7,
c12.getAbsoluteMaximumCapacity(), DELTA);
+ capacityScheduler.stop();
}
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
@@ -142,7 +147,10 @@ public class TestQueueParsing {
CapacityScheduler capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(new YarnConfiguration());
+ capacityScheduler.init(conf);
+ capacityScheduler.start();
capacityScheduler.reinitialize(conf, null);
+ capacityScheduler.stop();
}
public void testMaxCapacity() throws Exception {
@@ -164,6 +172,8 @@ public class TestQueueParsing {
try {
capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(new YarnConfiguration());
+ capacityScheduler.init(conf);
+ capacityScheduler.start();
capacityScheduler.reinitialize(conf, null);
} catch (IllegalArgumentException iae) {
fail = true;
@@ -176,6 +186,8 @@ public class TestQueueParsing {
// Now this should work
capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(new YarnConfiguration());
+ capacityScheduler.init(conf);
+ capacityScheduler.start();
capacityScheduler.reinitialize(conf, null);
fail = false;
@@ -187,6 +199,7 @@ public class TestQueueParsing {
}
Assert.assertTrue("Didn't throw IllegalArgumentException for wrong " +
"setMaxCap", fail);
+ capacityScheduler.stop();
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java Sun Jun 1 19:11:23 2014
@@ -48,6 +48,8 @@ public class TestFSLeafQueue {
ResourceManager resourceManager = new ResourceManager();
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
String queueName = "root.queue1";
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Sun Jun 1 19:11:23 2014
@@ -120,6 +120,8 @@ public class TestFairScheduler extends F
// to initialize the master key
resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
+
+ scheduler.setRMContext(resourceManager.getRMContext());
}
@After
@@ -133,12 +135,12 @@ public class TestFairScheduler extends F
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
- ResourceScheduler scheduler = new FairScheduler();
+ FairScheduler scheduler = new FairScheduler();
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try {
- scheduler.reinitialize(conf, null);
+ scheduler.serviceInit(conf);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnRuntimeException e) {
@@ -152,7 +154,7 @@ public class TestFairScheduler extends F
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
try {
- scheduler.reinitialize(conf, null);
+ scheduler.serviceInit(conf);
fail("Exception is expected because the min vcores allocation is" +
" larger than the max vcores allocation.");
} catch (YarnRuntimeException e) {
@@ -184,6 +186,8 @@ public class TestFairScheduler extends F
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
128);
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
Assert.assertEquals(true, scheduler.assignMultiple);
Assert.assertEquals(3, scheduler.maxAssign);
@@ -211,6 +215,7 @@ public class TestFairScheduler extends F
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
+ fs.init(conf);
fs.reinitialize(conf, null);
Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory());
Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores());
@@ -228,8 +233,9 @@ public class TestFairScheduler extends F
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
- fs.reinitialize(conf, null);
- Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory());
+ fs.init(conf);
+ fs.reinitialize(conf, null);
+ Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory());
Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores());
Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory());
Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores());
@@ -237,6 +243,8 @@ public class TestFairScheduler extends F
@Test
public void testAggregateCapacityTracking() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@@ -262,6 +270,8 @@ public class TestFairScheduler extends F
@Test
public void testSimpleFairShareCalculation() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
@@ -289,6 +299,8 @@ public class TestFairScheduler extends F
@Test
public void testSimpleHierarchicalFairShareCalculation() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
@@ -322,6 +334,8 @@ public class TestFairScheduler extends F
@Test
public void testHierarchicalQueuesSimilarParents() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueManager = scheduler.getQueueManager();
@@ -346,6 +360,8 @@ public class TestFairScheduler extends F
@Test
public void testSchedulerRootQueueMetrics() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@@ -385,6 +401,8 @@ public class TestFairScheduler extends F
@Test (timeout = 5000)
public void testSimpleContainerAllocation() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@@ -433,6 +451,8 @@ public class TestFairScheduler extends F
@Test (timeout = 5000)
public void testSimpleContainerReservation() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@@ -487,6 +507,8 @@ public class TestFairScheduler extends F
@Test
public void testUserAsDefaultQueue() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMContext rmContext = resourceManager.getRMContext();
Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
@@ -513,6 +535,8 @@ public class TestFairScheduler extends F
@Test
public void testNotUserAsDefaultQueue() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMContext rmContext = resourceManager.getRMContext();
Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
@@ -539,6 +563,8 @@ public class TestFairScheduler extends F
@Test
public void testEmptyQueueName() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// only default queue
@@ -559,8 +585,10 @@ public class TestFairScheduler extends F
@Test
public void testAssignToQueue() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
@@ -577,6 +605,8 @@ public class TestFairScheduler extends F
@Test
public void testAssignToNonLeafQueueReturnsNull() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
scheduler.getQueueManager().getLeafQueue("root.child1.granchild", true);
@@ -594,6 +624,8 @@ public class TestFairScheduler extends F
public void testQueuePlacementWithPolicy() throws Exception {
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId appId;
@@ -654,7 +686,9 @@ public class TestFairScheduler extends F
out.println("</queue>");
out.println("</allocations>");
out.close();
-
+
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
@@ -703,6 +737,8 @@ public class TestFairScheduler extends F
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
@@ -735,6 +771,8 @@ public class TestFairScheduler extends F
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
int capacity = 16 * 1024;
@@ -769,6 +807,8 @@ public class TestFairScheduler extends F
*/
@Test
public void testQueueDemandCalculation() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
@@ -819,6 +859,8 @@ public class TestFairScheduler extends F
@Test
public void testAppAdditionAndRemoval() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId attemptId =createAppAttemptId(1, 1);
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default",
@@ -869,6 +911,8 @@ public class TestFairScheduler extends F
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueManager = scheduler.getQueueManager();
@@ -901,7 +945,9 @@ public class TestFairScheduler extends F
out.println("</queue>");
out.println("</allocations>");
out.close();
-
+
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueManager = scheduler.getQueueManager();
@@ -928,6 +974,8 @@ public class TestFairScheduler extends F
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
@@ -985,8 +1033,10 @@ public class TestFairScheduler extends F
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
@@ -1057,7 +1107,9 @@ public class TestFairScheduler extends F
out.println("</queue>");
out.println("</allocations>");
out.close();
-
+
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Create two nodes
@@ -1224,6 +1276,8 @@ public class TestFairScheduler extends F
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Create four nodes
@@ -1321,6 +1375,8 @@ public class TestFairScheduler extends F
@Test (timeout = 5000)
public void testMultipleContainersWaitingForReservation() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@@ -1363,8 +1419,10 @@ public class TestFairScheduler extends F
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
// Add a node
RMNode node1 =
MockNodes
@@ -1404,6 +1462,8 @@ public class TestFairScheduler extends F
@Test (timeout = 5000)
public void testReservationWhileMultiplePriorities() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@@ -1484,8 +1544,10 @@ public class TestFairScheduler extends F
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
"norealuserhasthisname", 1);
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
@@ -1499,6 +1561,8 @@ public class TestFairScheduler extends F
@Test (timeout = 5000)
public void testMultipleNodesSingleRackRequest() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 =
@@ -1548,6 +1612,8 @@ public class TestFairScheduler extends F
@Test (timeout = 5000)
public void testFifoWithinQueue() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 =
@@ -1592,6 +1658,8 @@ public class TestFairScheduler extends F
@Test(timeout = 3000)
public void testMaxAssign() throws Exception {
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node =
@@ -1635,6 +1703,8 @@ public class TestFairScheduler extends F
*/
@Test(timeout = 5000)
public void testAssignContainer() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
final String user = "user1";
@@ -1718,9 +1788,11 @@ public class TestFairScheduler extends F
out.println("</queue>");
out.println("</allocations>");
out.close();
-
+
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
int appId = this.APP_ID++;
String user = "usernotallow";
String queue = "queue1";
@@ -1769,6 +1841,8 @@ public class TestFairScheduler extends F
@Test
public void testReservationThatDoesntFit() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 =
@@ -1797,6 +1871,8 @@ public class TestFairScheduler extends F
@Test
public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
@@ -1825,6 +1901,8 @@ public class TestFairScheduler extends F
@Test
public void testStrictLocality() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -1865,6 +1943,8 @@ public class TestFairScheduler extends F
@Test
public void testCancelStrictLocality() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -1915,6 +1995,8 @@ public class TestFairScheduler extends F
*/
@Test
public void testReservationsStrictLocality() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -1955,6 +2037,8 @@ public class TestFairScheduler extends F
@Test
public void testNoMoreCpuOnNode() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
@@ -1976,6 +2060,8 @@ public class TestFairScheduler extends F
@Test
public void testBasicDRFAssignment() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
@@ -2016,6 +2102,8 @@ public class TestFairScheduler extends F
*/
@Test
public void testBasicDRFWithQueues() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
@@ -2052,6 +2140,8 @@ public class TestFairScheduler extends F
@Test
public void testDRFHierarchicalQueues() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
@@ -2120,7 +2210,9 @@ public class TestFairScheduler extends F
public void testHostPortNodeName() throws Exception {
conf.setBoolean(YarnConfiguration
.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
- scheduler.reinitialize(conf,
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf,
resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024),
1, "127.0.0.1", 1);
@@ -2200,9 +2292,11 @@ public class TestFairScheduler extends F
out.println("</user>");
out.println("</allocations>");
out.close();
-
+
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
// exceeds no limits
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1");
verifyAppRunnable(attId1, true);
@@ -2254,9 +2348,11 @@ public class TestFairScheduler extends F
out.println("</queue>");
out.println("</allocations>");
out.close();
-
+
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
// exceeds no limits
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1");
verifyAppRunnable(attId1, true);
@@ -2316,6 +2412,9 @@ public class TestFairScheduler extends F
Configuration conf = createConfiguration();
conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
true);
+ fs.setRMContext(resourceManager.getRMContext());
+ fs.init(conf);
+ fs.start();
fs.reinitialize(conf, resourceManager.getRMContext());
Assert.assertTrue("Continuous scheduling should be enabled.",
fs.isContinuousSchedulingEnabled());
@@ -2396,6 +2495,8 @@ public class TestFairScheduler extends F
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueManager = scheduler.getQueueManager();
@@ -2439,6 +2540,8 @@ public class TestFairScheduler extends F
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
List<QueuePlacementRule> rules = scheduler.allocConf.placementPolicy
@@ -2455,6 +2558,8 @@ public class TestFairScheduler extends F
@SuppressWarnings("resource")
@Test
public void testBlacklistNodes() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
final int GB = 1024;
@@ -2507,6 +2612,8 @@ public class TestFairScheduler extends F
@Test
public void testGetAppsInQueue() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId appAttId1 =
@@ -2552,8 +2659,10 @@ public class TestFairScheduler extends F
@Test
public void testMoveRunnableApp() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true);
@@ -2591,8 +2700,10 @@ public class TestFairScheduler extends F
@Test
public void testMoveNonRunnableApp() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true);
@@ -2611,8 +2722,10 @@ public class TestFairScheduler extends F
@Test
public void testMoveMakesAppRunnable() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true);
@@ -2638,8 +2751,10 @@ public class TestFairScheduler extends F
@Test (expected = YarnException.class)
public void testMoveWouldViolateMaxAppsConstraints() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
QueueManager queueMgr = scheduler.getQueueManager();
queueMgr.getLeafQueue("queue2", true);
scheduler.getAllocationConfiguration().queueMaxApps.put("root.queue2", 0);
@@ -2652,8 +2767,10 @@ public class TestFairScheduler extends F
@Test (expected = YarnException.class)
public void testMoveWouldViolateMaxResourcesConstraints() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
queueMgr.getLeafQueue("queue2", true);
@@ -2675,6 +2792,8 @@ public class TestFairScheduler extends F
@Test (expected = YarnException.class)
public void testMoveToNonexistentQueue() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
scheduler.getQueueManager().getLeafQueue("queue1", true);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java Sun Jun 1 19:11:23 2014
@@ -51,6 +51,8 @@ public class TestFairSchedulerEventLog {
resourceManager = new ResourceManager();
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
}
@@ -69,7 +71,13 @@ public class TestFairSchedulerEventLog {
public void tearDown() {
logFile.delete();
logFile.getParentFile().delete(); // fairscheduler/
- scheduler = null;
- resourceManager = null;
+ if (scheduler != null) {
+ scheduler.stop();
+ scheduler = null;
+ }
+ if (resourceManager != null) {
+ resourceManager.stop();
+ resourceManager = null;
+ }
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Sun Jun 1 19:11:23 2014
@@ -145,9 +145,13 @@ public class TestFifoScheduler {
RMContext rmContext = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null, null, writer);
- FifoScheduler schedular = new FifoScheduler();
- schedular.reinitialize(new Configuration(), rmContext);
- QueueMetrics metrics = schedular.getRootQueueMetrics();
+ FifoScheduler scheduler = new FifoScheduler();
+ Configuration conf = new Configuration();
+ scheduler.setRMContext(rmContext);
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, rmContext);
+ QueueMetrics metrics = scheduler.getRootQueueMetrics();
int beforeAppsSubmitted = metrics.getAppsSubmitted();
ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
@@ -155,18 +159,19 @@ public class TestFifoScheduler {
appId, 1);
SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user");
- schedular.handle(appEvent);
+ scheduler.handle(appEvent);
SchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
- schedular.handle(attemptEvent);
+ scheduler.handle(attemptEvent);
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
SchedulerEvent attemptEvent2 =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
- schedular.handle(attemptEvent2);
+ scheduler.handle(attemptEvent2);
int afterAppsSubmitted = metrics.getAppsSubmitted();
Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted);
+ scheduler.stop();
}
@Test(timeout=2000)
@@ -184,6 +189,9 @@ public class TestFifoScheduler {
null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
FifoScheduler scheduler = new FifoScheduler();
+ scheduler.setRMContext(rmContext);
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(new Configuration(), rmContext);
RMNode node0 = MockNodes.newNodeInfo(1,
@@ -232,6 +240,7 @@ public class TestFifoScheduler {
//Also check that the containers were scheduled
SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId);
Assert.assertEquals(3, info.getLiveContainers().size());
+ scheduler.stop();
}
@Test(timeout=2000)
@@ -254,6 +263,9 @@ public class TestFifoScheduler {
return nodes;
}
};
+ scheduler.setRMContext(rmContext);
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(new Configuration(), rmContext);
RMNode node0 = MockNodes.newNodeInfo(1,
Resources.createResource(2048, 4), 1, "127.0.0.1");
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java?rev=1599025&r1=1599024&r2=1599025&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java Sun Jun 1 19:11:23 2014
@@ -203,10 +203,11 @@ public class TestRMWebApp {
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
- cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
+ cs.setRMContext(new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null));
+ cs.init(conf);
return cs;
}
@@ -269,19 +270,21 @@ public class TestRMWebApp {
ResourceManager rm = mock(ResourceManager.class);
RMContext rmContext = mockRMContext(apps, racks, nodes,
mbsPerNode);
- ResourceScheduler rs = mockFifoScheduler();
+ ResourceScheduler rs = mockFifoScheduler(rmContext);
when(rm.getResourceScheduler()).thenReturn(rs);
when(rm.getRMContext()).thenReturn(rmContext);
return rm;
}
- public static FifoScheduler mockFifoScheduler() throws Exception {
+ public static FifoScheduler mockFifoScheduler(RMContext rmContext)
+ throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupFifoQueueConfiguration(conf);
FifoScheduler fs = new FifoScheduler();
fs.setConf(new YarnConfiguration());
- fs.reinitialize(conf, null);
+ fs.setRMContext(rmContext);
+ fs.init(conf);
return fs;
}