You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cl...@apache.org on 2014/06/16 20:14:12 UTC
svn commit: r1602947 [4/5] - in
/hadoop/common/branches/fs-encryption/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/ha...
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Mon Jun 16 18:13:57 2014
@@ -35,7 +35,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@Private
@Unstable
@@ -228,4 +230,11 @@ public class FSParentQueue extends FSQue
// Should never be called since all applications are submitted to LeafQueues
return null;
}
+
+ @Override
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+ // TODO Auto-generated method stub
+
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Mon Jun 16 18:13:57 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -117,7 +118,6 @@ import com.google.common.annotations.Vis
@SuppressWarnings("unchecked")
public class FairScheduler extends
AbstractYarnScheduler<FSSchedulerApp, FSSchedulerNode> {
- private boolean initialized;
private FairSchedulerConfiguration conf;
private Resource incrAllocation;
@@ -137,6 +137,11 @@ public class FairScheduler extends
// How often fair shares are re-calculated (ms)
protected long UPDATE_INTERVAL = 500;
+ private Thread updateThread;
+ private Thread schedulingThread;
+ // timeout to join when we stop this service
+ protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
// Aggregate metrics
FSQueueMetrics rootMetrics;
@@ -182,6 +187,7 @@ public class FairScheduler extends
AllocationConfiguration allocConf;
public FairScheduler() {
+ super(FairScheduler.class.getName());
clock = new SystemClock();
allocsLoader = new AllocationFileLoaderService();
queueMgr = new QueueManager(this);
@@ -473,7 +479,8 @@ public class FairScheduler extends
return resToPreempt;
}
- public RMContainerTokenSecretManager getContainerTokenSecretManager() {
+ public synchronized RMContainerTokenSecretManager
+ getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
}
@@ -829,6 +836,12 @@ public class FairScheduler extends
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
+ // Set amResource for this app
+ if (!application.getUnmanagedAM() && ask.size() == 1
+ && application.getLiveContainers().isEmpty()) {
+ application.setAMResource(ask.get(0).getCapability());
+ }
+
// Release containers
for (ContainerId releasedContainerId : release) {
RMContainer rmContainer = getRMContainer(releasedContainerId);
@@ -1059,8 +1072,8 @@ public class FairScheduler extends
private boolean shouldAttemptPreemption() {
if (preemptionEnabled) {
return (preemptionUtilizationThreshold < Math.max(
- (float) rootMetrics.getAvailableMB() / clusterResource.getMemory(),
- (float) rootMetrics.getAvailableVirtualCores() /
+ (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
+ (float) rootMetrics.getAllocatedVirtualCores() /
clusterResource.getVirtualCores()));
}
return false;
@@ -1154,87 +1167,130 @@ public class FairScheduler extends
// NOT IMPLEMENTED
}
- @Override
- public synchronized void reinitialize(Configuration conf, RMContext rmContext)
- throws IOException {
- if (!initialized) {
- this.conf = new FairSchedulerConfiguration(conf);
- validateConf(this.conf);
- minimumAllocation = this.conf.getMinimumAllocation();
- maximumAllocation = this.conf.getMaximumAllocation();
- incrAllocation = this.conf.getIncrementAllocation();
- continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
- continuousSchedulingSleepMs =
- this.conf.getContinuousSchedulingSleepMs();
- nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
- rackLocalityThreshold = this.conf.getLocalityThresholdRack();
- nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
- rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
- preemptionEnabled = this.conf.getPreemptionEnabled();
- preemptionUtilizationThreshold =
- this.conf.getPreemptionUtilizationThreshold();
- assignMultiple = this.conf.getAssignMultiple();
- maxAssign = this.conf.getMaxAssign();
- sizeBasedWeight = this.conf.getSizeBasedWeight();
- preemptionInterval = this.conf.getPreemptionInterval();
- waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
- usePortForNodeName = this.conf.getUsePortForNodeName();
-
- rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
- this.rmContext = rmContext;
- // This stores per-application scheduling information
- this.applications =
- new ConcurrentHashMap<ApplicationId, SchedulerApplication<FSSchedulerApp>>();
- this.eventLog = new FairSchedulerEventLog();
- eventLog.init(this.conf);
+ public synchronized void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
- initialized = true;
+ private synchronized void initScheduler(Configuration conf)
+ throws IOException {
+ this.conf = new FairSchedulerConfiguration(conf);
+ validateConf(this.conf);
+ minimumAllocation = this.conf.getMinimumAllocation();
+ maximumAllocation = this.conf.getMaximumAllocation();
+ incrAllocation = this.conf.getIncrementAllocation();
+ continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+ continuousSchedulingSleepMs =
+ this.conf.getContinuousSchedulingSleepMs();
+ nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
+ rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+ nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+ rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
+ preemptionEnabled = this.conf.getPreemptionEnabled();
+ preemptionUtilizationThreshold =
+ this.conf.getPreemptionUtilizationThreshold();
+ assignMultiple = this.conf.getAssignMultiple();
+ maxAssign = this.conf.getMaxAssign();
+ sizeBasedWeight = this.conf.getSizeBasedWeight();
+ preemptionInterval = this.conf.getPreemptionInterval();
+ waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+ usePortForNodeName = this.conf.getUsePortForNodeName();
+
+ rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
+ // This stores per-application scheduling information
+ this.applications =
+ new ConcurrentHashMap<ApplicationId,SchedulerApplication<FSSchedulerApp>>();
+ this.eventLog = new FairSchedulerEventLog();
+ eventLog.init(this.conf);
- allocConf = new AllocationConfiguration(conf);
- try {
- queueMgr.initialize(conf);
- } catch (Exception e) {
- throw new IOException("Failed to start FairScheduler", e);
- }
+ allocConf = new AllocationConfiguration(conf);
+ try {
+ queueMgr.initialize(conf);
+ } catch (Exception e) {
+ throw new IOException("Failed to start FairScheduler", e);
+ }
- Thread updateThread = new Thread(new UpdateThread());
- updateThread.setName("FairSchedulerUpdateThread");
- updateThread.setDaemon(true);
- updateThread.start();
+ updateThread = new Thread(new UpdateThread());
+ updateThread.setName("FairSchedulerUpdateThread");
+ updateThread.setDaemon(true);
- if (continuousSchedulingEnabled) {
- // start continuous scheduling thread
- Thread schedulingThread = new Thread(
+ if (continuousSchedulingEnabled) {
+ // start continuous scheduling thread
+ schedulingThread = new Thread(
new Runnable() {
@Override
public void run() {
continuousScheduling();
}
}
- );
- schedulingThread.setName("ContinuousScheduling");
- schedulingThread.setDaemon(true);
- schedulingThread.start();
+ );
+ schedulingThread.setName("ContinuousScheduling");
+ schedulingThread.setDaemon(true);
+ }
+
+ allocsLoader.init(conf);
+ allocsLoader.setReloadListener(new AllocationReloadListener());
+ // If we fail to load allocations file on initialize, we want to fail
+ // immediately. After a successful load, exceptions on future reloads
+ // will just result in leaving things as they are.
+ try {
+ allocsLoader.reloadAllocations();
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize FairScheduler", e);
+ }
+ }
+
+ private synchronized void startSchedulerThreads() {
+ Preconditions.checkNotNull(updateThread, "updateThread is null");
+ Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
+ updateThread.start();
+ if (continuousSchedulingEnabled) {
+ Preconditions.checkNotNull(schedulingThread, "schedulingThread is null");
+ schedulingThread.start();
+ }
+ allocsLoader.start();
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ initScheduler(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ startSchedulerThreads();
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ synchronized (this) {
+ if (updateThread != null) {
+ updateThread.interrupt();
+ updateThread.join(THREAD_JOIN_TIMEOUT_MS);
}
-
- allocsLoader.init(conf);
- allocsLoader.setReloadListener(new AllocationReloadListener());
- // If we fail to load allocations file on initialize, we want to fail
- // immediately. After a successful load, exceptions on future reloads
- // will just result in leaving things as they are.
- try {
- allocsLoader.reloadAllocations();
- } catch (Exception e) {
- throw new IOException("Failed to initialize FairScheduler", e);
+ if (continuousSchedulingEnabled) {
+ if (schedulingThread != null) {
+ schedulingThread.interrupt();
+ schedulingThread.join(THREAD_JOIN_TIMEOUT_MS);
+ }
}
- allocsLoader.start();
- } else {
- try {
- allocsLoader.reloadAllocations();
- } catch (Exception e) {
- LOG.error("Failed to reload allocations file", e);
+ if (allocsLoader != null) {
+ allocsLoader.stop();
}
}
+
+ super.serviceStop();
+ }
+
+ @Override
+ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+ throws IOException {
+ try {
+ allocsLoader.reloadAllocations();
+ } catch (Exception e) {
+ LOG.error("Failed to reload allocations file", e);
+ }
}
@Override
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Mon Jun 16 18:13:57 2014
@@ -149,4 +149,15 @@ public abstract class SchedulingPolicy {
*/
public abstract boolean checkIfUsageOverFairShare(
Resource usage, Resource fairShare);
+
+ /**
+ * Check if a leaf queue's AM resource usage over its limit under this policy
+ *
+ * @param usage {@link Resource} the resource used by application masters
+ * @param maxAMResource {@link Resource} the maximum allowed resource for
+ * application masters
+ * @return true if AM resource usage is over the limit
+ */
+ public abstract boolean checkIfAMResourceUsageOverLimit(
+ Resource usage, Resource maxAMResource);
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java Mon Jun 16 18:13:57 2014
@@ -75,6 +75,11 @@ public class DominantResourceFairnessPol
}
@Override
+ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+ return !Resources.fitsIn(usage, maxAMResource);
+ }
+
+ @Override
public void initialize(Resource clusterCapacity) {
comparator.setClusterCapacity(clusterCapacity);
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Mon Jun 16 18:13:57 2014
@@ -125,6 +125,11 @@ public class FairSharePolicy extends Sch
}
@Override
+ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+ return usage.getMemory() > maxAMResource.getMemory();
+ }
+
+ @Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_ANY;
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java Mon Jun 16 18:13:57 2014
@@ -95,6 +95,11 @@ public class FifoPolicy extends Scheduli
}
@Override
+ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+ return usage.getMemory() > maxAMResource.getMemory();
+ }
+
+ @Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_LEAF;
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Mon Jun 16 18:13:57 2014
@@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -89,6 +90,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -111,7 +113,6 @@ public class FifoScheduler extends
Configuration conf;
- private boolean initialized;
private boolean usePortForNodeName;
private ActiveUsersManager activeUsersManager;
@@ -178,8 +179,60 @@ public class FifoScheduler extends
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
+
+ @Override
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+ increaseUsedResources(rmContainer);
+ updateAppHeadRoom(schedulerAttempt);
+ updateAvailableResourcesMetrics();
+ }
};
+ public FifoScheduler() {
+ super(FifoScheduler.class.getName());
+ }
+
+ private synchronized void initScheduler(Configuration conf) {
+ validateConf(conf);
+ //Use ConcurrentSkipListMap because applications need to be ordered
+ this.applications =
+ new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+ this.minimumAllocation =
+ Resources.createResource(conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+ this.maximumAllocation =
+ Resources.createResource(conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ this.usePortForNodeName = conf.getBoolean(
+ YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
+ this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
+ conf);
+ this.activeUsersManager = new ActiveUsersManager(metrics);
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ initScheduler(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
@Override
public synchronized void setConf(Configuration conf) {
this.conf = conf;
@@ -216,35 +269,17 @@ public class FifoScheduler extends
}
@Override
+ public synchronized void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ @Override
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException
{
setConf(conf);
- if (!this.initialized) {
- validateConf(conf);
- this.rmContext = rmContext;
- //Use ConcurrentSkipListMap because applications need to be ordered
- this.applications =
- new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
- this.minimumAllocation =
- Resources.createResource(conf.getInt(
- YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
- this.maximumAllocation =
- Resources.createResource(conf.getInt(
- YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
- this.usePortForNodeName = conf.getBoolean(
- YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
- this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
- conf);
- this.activeUsersManager = new ActiveUsersManager(metrics);
- this.initialized = true;
- }
}
-
@Override
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
@@ -465,7 +500,7 @@ public class FifoScheduler extends
if (attempt == null) {
continue;
}
- attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
+ updateAppHeadRoom(attempt);
}
}
@@ -636,11 +671,10 @@ public class FifoScheduler extends
application.allocate(type, node, priority, request, container);
// Inform the node
- node.allocateContainer(application.getApplicationId(),
- rmContainer);
+ node.allocateContainer(rmContainer);
// Update usage for this container
- Resources.addTo(usedResource, capability);
+ increaseUsedResources(rmContainer);
}
}
@@ -684,9 +718,22 @@ public class FifoScheduler extends
LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getAvailableResource());
}
-
- metrics.setAvailableResourcesToQueue(
- Resources.subtract(clusterResource, usedResource));
+
+ updateAvailableResourcesMetrics();
+ }
+
+ private void increaseUsedResources(RMContainer rmContainer) {
+ Resources.addTo(usedResource, rmContainer.getAllocatedResource());
+ }
+
+ private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
+ schedulerAttempt.setHeadroom(Resources.subtract(clusterResource,
+ usedResource));
+ }
+
+ private void updateAvailableResourcesMetrics() {
+ metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource,
+ usedResource));
}
@Override
@@ -696,6 +743,9 @@ public class FifoScheduler extends
{
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
+ recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+ nodeAddedEvent.getAddedRMNode());
+
}
break;
case NODE_REMOVED:
@@ -900,4 +950,8 @@ public class FifoScheduler extends
return null;
}
}
+
+ public Resource getUsedResource() {
+ return usedResource;
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java Mon Jun 16 18:13:57 2014
@@ -25,6 +25,8 @@ import static org.apache.hadoop.yarn.web
import java.util.Collection;
import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang.StringEscapeUtils;
@@ -35,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
@@ -60,7 +63,14 @@ public class FairSchedulerAppsBlock exte
super(ctx);
FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
fsinfo = new FairSchedulerInfo(scheduler);
- apps = rmContext.getRMApps();
+ apps = new ConcurrentHashMap<ApplicationId, RMApp>();
+ for (Map.Entry<ApplicationId, RMApp> entry : rmContext.getRMApps().entrySet()) {
+ if (!(RMAppState.NEW.equals(entry.getValue().getState())
+ || RMAppState.NEW_SAVING.equals(entry.getValue().getState())
+ || RMAppState.SUBMITTED.equals(entry.getValue().getState()))) {
+ apps.put(entry.getKey(), entry.getValue());
+ }
+ }
this.conf = conf;
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java Mon Jun 16 18:13:57 2014
@@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.AccessControlException;
+import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
@@ -31,19 +34,27 @@ import java.util.concurrent.ConcurrentMa
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -56,6 +67,8 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -66,10 +79,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
@@ -82,7 +95,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
-import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -584,4 +596,166 @@ public class RMWebServices {
return appAttemptsInfo;
}
+
+ @GET
+ @Path("/apps/{appid}/state")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public AppState getAppState(@Context HttpServletRequest hsr,
+ @PathParam("appid") String appId) throws AuthorizationException {
+ init();
+ UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+ String userName = "";
+ if (callerUGI != null) {
+ userName = callerUGI.getUserName();
+ }
+ RMApp app = null;
+ try {
+ app = getRMAppForAppId(appId);
+ } catch (NotFoundException e) {
+ RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
+ "UNKNOWN", "RMWebService",
+ "Trying to get state of an absent application " + appId);
+ throw e;
+ }
+
+ AppState ret = new AppState();
+ ret.setState(app.getState().toString());
+
+ return ret;
+ }
+
+ // can't return POJO because we can't control the status code
+ // it's always set to 200 when we need to allow it to be set
+ // to 202
+
+ @PUT
+ @Path("/apps/{appid}/state")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response updateAppState(AppState targetState,
+ @Context HttpServletRequest hsr, @PathParam("appid") String appId)
+ throws AuthorizationException, YarnException, InterruptedException,
+ IOException {
+
+ init();
+ UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+ if (callerUGI == null) {
+ String msg = "Unable to obtain user name, user not authenticated";
+ throw new AuthorizationException(msg);
+ }
+
+ String userName = callerUGI.getUserName();
+ RMApp app = null;
+ try {
+ app = getRMAppForAppId(appId);
+ } catch (NotFoundException e) {
+ RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
+ "UNKNOWN", "RMWebService", "Trying to kill/move an absent application "
+ + appId);
+ throw e;
+ }
+
+ if (!app.getState().toString().equals(targetState.getState())) {
+ // user is attempting to change state. right we only
+ // allow users to kill the app
+
+ if (targetState.getState().equals(YarnApplicationState.KILLED.toString())) {
+ return killApp(app, callerUGI, hsr);
+ }
+ throw new BadRequestException("Only '"
+ + YarnApplicationState.KILLED.toString()
+ + "' is allowed as a target state.");
+ }
+
+ AppState ret = new AppState();
+ ret.setState(app.getState().toString());
+
+ return Response.status(Status.OK).entity(ret).build();
+ }
+
+ protected Response killApp(RMApp app, UserGroupInformation callerUGI,
+ HttpServletRequest hsr) throws IOException, InterruptedException {
+
+ if (app == null) {
+ throw new IllegalArgumentException("app cannot be null");
+ }
+ String userName = callerUGI.getUserName();
+ final ApplicationId appid = app.getApplicationId();
+ KillApplicationResponse resp = null;
+ try {
+ resp =
+ callerUGI
+ .doAs(new PrivilegedExceptionAction<KillApplicationResponse>() {
+ @Override
+ public KillApplicationResponse run() throws IOException,
+ YarnException {
+ KillApplicationRequest req =
+ KillApplicationRequest.newInstance(appid);
+ return rm.getClientRMService().forceKillApplication(req);
+ }
+ });
+ } catch (UndeclaredThrowableException ue) {
+ // if the root cause is a permissions issue
+ // bubble that up to the user
+ if (ue.getCause() instanceof YarnException) {
+ YarnException ye = (YarnException) ue.getCause();
+ if (ye.getCause() instanceof AccessControlException) {
+ String appId = app.getApplicationId().toString();
+ String msg =
+ "Unauthorized attempt to kill appid " + appId
+ + " by remote user " + userName;
+ return Response.status(Status.FORBIDDEN).entity(msg).build();
+ } else {
+ throw ue;
+ }
+ } else {
+ throw ue;
+ }
+ }
+
+ AppState ret = new AppState();
+ ret.setState(app.getState().toString());
+
+ if (resp.getIsKillCompleted()) {
+ RMAuditLogger.logSuccess(userName, AuditConstants.KILL_APP_REQUEST,
+ "RMWebService", app.getApplicationId());
+ } else {
+ return Response.status(Status.ACCEPTED).entity(ret)
+ .header(HttpHeaders.LOCATION, hsr.getRequestURL()).build();
+ }
+ return Response.status(Status.OK).entity(ret).build();
+ }
+
+ private RMApp getRMAppForAppId(String appId) {
+
+ if (appId == null || appId.isEmpty()) {
+ throw new NotFoundException("appId, " + appId + ", is empty or null");
+ }
+ ApplicationId id;
+ try {
+ id = ConverterUtils.toApplicationId(recordFactory, appId);
+ } catch (NumberFormatException e) {
+ throw new NotFoundException("appId is invalid");
+ }
+ if (id == null) {
+ throw new NotFoundException("appId is invalid");
+ }
+ RMApp app = rm.getRMContext().getRMApps().get(id);
+ if (app == null) {
+ throw new NotFoundException("app with id: " + appId + " not found");
+ }
+ return app;
+ }
+
+ private UserGroupInformation getCallerUserGroupInformation(
+ HttpServletRequest hsr) {
+
+ String remoteUser = hsr.getRemoteUser();
+ UserGroupInformation callerUGI = null;
+ if (remoteUser != null) {
+ callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+
+ return callerUGI;
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Mon Jun 16 18:13:57 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -32,7 +31,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -105,14 +104,14 @@ public class MockNM {
}
public RegisterNodeManagerResponse registerNode(
- List<ContainerStatus> containerStatus) throws Exception{
+ List<NMContainerStatus> containerReports) throws Exception{
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
req.setNodeId(nodeId);
req.setHttpPort(httpPort);
Resource resource = BuilderUtils.newResource(memory, vCores);
req.setResource(resource);
- req.setContainerStatuses(containerStatus);
+ req.setContainerStatuses(containerReports);
req.setNMVersion(version);
RegisterNodeManagerResponse registrationResponse =
resourceTracker.registerNodeManager(req);
@@ -185,4 +184,11 @@ public class MockNM {
return heartbeatResponse;
}
+ public int getMemory() {
+ return memory;
+ }
+
+ public int getvCores() {
+ return vCores;
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Mon Jun 16 18:13:57 2014
@@ -21,10 +21,9 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
+import java.util.List;
import java.util.Map;
-import org.junit.Assert;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
@@ -47,12 +46,14 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -69,6 +70,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -76,6 +81,7 @@ import org.apache.hadoop.yarn.util.Recor
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.junit.Assert;
@SuppressWarnings("unchecked")
public class MockRM extends ResourceManager {
@@ -144,11 +150,26 @@ public class MockRM extends ResourceMana
}
}
+ public void waitForContainerToComplete(RMAppAttempt attempt,
+ NMContainerStatus completedContainer) throws InterruptedException {
+ while (true) {
+ List<ContainerStatus> containers = attempt.getJustFinishedContainers();
+ System.out.println("Received completed containers " + containers);
+ for (ContainerStatus container : containers) {
+ if (container.getContainerId().equals(
+ completedContainer.getContainerId())) {
+ return;
+ }
+ }
+ Thread.sleep(200);
+ }
+ }
+
public void waitForState(MockNM nm, ContainerId containerId,
RMContainerState containerState) throws Exception {
RMContainer container = getResourceScheduler().getRMContainer(containerId);
int timeoutSecs = 0;
- while(container == null && timeoutSecs++ < 20) {
+ while(container == null && timeoutSecs++ < 100) {
nm.nodeHeartbeat(true);
container = getResourceScheduler().getRMContainer(containerId);
System.out.println("Waiting for container " + containerId + " to be allocated.");
@@ -333,7 +354,7 @@ public class MockRM extends ResourceMana
public void sendNodeStarted(MockNM nm) throws Exception {
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
nm.getNodeId());
- node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(nm.getNodeId(), null));
}
public void sendNodeLost(MockNM nm) throws Exception {
@@ -542,4 +563,16 @@ public class MockRM extends ResourceMana
.newInstance(appId));
return response.getApplicationReport();
}
+
+ // Explicitly reset queue metrics for testing.
+ @SuppressWarnings("static-access")
+ public void clearQueueMetrics(RMApp app) {
+ ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) getResourceScheduler())
+ .getSchedulerApplications().get(app.getApplicationId()).getQueue()
+ .getMetrics().clearQueueMetrics();
+ }
+
+ public RMActiveServices getRMActiveService() {
+ return activeServices;
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Mon Jun 16 18:13:57 2014
@@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -259,6 +260,28 @@ public class TestClientRMService {
}
@Test
+ public void testGetApplicationResourceUsageReportDummy() throws YarnException,
+ IOException {
+ ApplicationAttemptId attemptId = getApplicationAttemptId(1);
+ YarnScheduler yarnScheduler = mockYarnScheduler();
+ RMContext rmContext = mock(RMContext.class);
+ mockRMContext(yarnScheduler, rmContext);
+ when(rmContext.getDispatcher().getEventHandler()).thenReturn(
+ new EventHandler<Event>() {
+ public void handle(Event event) {
+ }
+ });
+ ApplicationSubmissionContext asContext =
+ mock(ApplicationSubmissionContext.class);
+ YarnConfiguration config = new YarnConfiguration();
+ RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
+ rmContext, yarnScheduler, null, asContext, config, false);
+ ApplicationResourceUsageReport report = rmAppAttemptImpl
+ .getApplicationResourceUsageReport();
+ assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT);
+ }
+
+ @Test
public void testGetApplicationAttempts() throws YarnException, IOException {
ClientRMService rmService = createRMService();
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
@@ -646,7 +669,8 @@ public class TestClientRMService {
ApplicationId[] appIds =
{getApplicationId(101), getApplicationId(102), getApplicationId(103)};
List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3");
-
+
+ long[] submitTimeMillis = new long[3];
// Submit applications
for (int i = 0; i < appIds.length; i++) {
ApplicationId appId = appIds[i];
@@ -656,6 +680,7 @@ public class TestClientRMService {
appId, appNames[i], queues[i % queues.length],
new HashSet<String>(tags.subList(0, i + 1)));
rmService.submitApplication(submitRequest);
+ submitTimeMillis[i] = System.currentTimeMillis();
}
// Test different cases of ClientRMService#getApplications()
@@ -667,6 +692,24 @@ public class TestClientRMService {
request.setLimit(1L);
assertEquals("Failed to limit applications", 1,
rmService.getApplications(request).getApplicationList().size());
+
+ // Check start range
+ request = GetApplicationsRequest.newInstance();
+ request.setStartRange(submitTimeMillis[0], System.currentTimeMillis());
+
+ // 2 applications are submitted after first timeMills
+ assertEquals("Incorrect number of matching start range",
+ 2, rmService.getApplications(request).getApplicationList().size());
+
+ // 1 application is submitted after the second timeMills
+ request.setStartRange(submitTimeMillis[1], System.currentTimeMillis());
+ assertEquals("Incorrect number of matching start range",
+ 1, rmService.getApplications(request).getApplicationList().size());
+
+ // no application is submitted after the third timeMills
+ request.setStartRange(submitTimeMillis[2], System.currentTimeMillis());
+ assertEquals("Incorrect number of matching start range",
+ 0, rmService.getApplications(request).getApplicationList().size());
// Check queue
request = GetApplicationsRequest.newInstance();
@@ -944,6 +987,8 @@ public class TestClientRMService {
Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102)));
when(yarnScheduler.getAppsInQueue(QUEUE_2)).thenReturn(
Arrays.asList(getApplicationAttemptId(103)));
+ ApplicationAttemptId attemptId = getApplicationAttemptId(1);
+ when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
return yarnScheduler;
}
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Mon Jun 16 18:13:57 2014
@@ -18,16 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -56,11 +57,10 @@ import org.apache.hadoop.yarn.util.resou
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
public class TestFifoScheduler {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -77,12 +77,12 @@ public class TestFifoScheduler {
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
- ResourceScheduler scheduler = new FifoScheduler();
+ FifoScheduler scheduler = new FifoScheduler();
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try {
- scheduler.reinitialize(conf, null);
+ scheduler.serviceInit(conf);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnRuntimeException e) {
@@ -218,6 +218,9 @@ public class TestFifoScheduler {
public void testNodeUpdateBeforeAppAttemptInit() throws Exception {
FifoScheduler scheduler = new FifoScheduler();
MockRM rm = new MockRM(conf);
+ scheduler.setRMContext(rm.getRMContext());
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, rm.getRMContext());
RMNode node = MockNodes.newNodeInfo(1,
@@ -293,7 +296,12 @@ public class TestFifoScheduler {
conf.setQueues("default", new String[] {"default"});
conf.setCapacity("default", 100);
FifoScheduler fs = new FifoScheduler();
+ fs.init(conf);
+ fs.start();
+ // mock rmContext to avoid NPE.
+ RMContext context = mock(RMContext.class);
fs.reinitialize(conf, null);
+ fs.setRMContext(context);
RMNode n1 =
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2");
@@ -313,6 +321,7 @@ public class TestFifoScheduler {
fs.handle(new NodeUpdateSchedulerEvent(n1));
Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
+ fs.stop();
}
@Test (timeout = 50000)
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java Mon Jun 16 18:13:57 2014
@@ -43,10 +43,11 @@ import org.junit.Test;
public class TestMoveApplication {
private ResourceManager resourceManager = null;
private static boolean failMove;
-
+ private Configuration conf;
+
@Before
public void setUp() throws Exception {
- Configuration conf = new YarnConfiguration();
+ conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class,
FifoSchedulerWithMove.class);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, " ");
@@ -119,28 +120,23 @@ public class TestMoveApplication {
}
}
- @Test (timeout = 5000)
- public void testMoveSuccessful() throws Exception {
- // Submit application
- Application application = new Application("user1", resourceManager);
- ApplicationId appId = application.getApplicationId();
- application.submit();
-
- // Wait for app to be accepted
- RMApp app = resourceManager.rmContext.getRMApps().get(appId);
- while (app.getState() != RMAppState.ACCEPTED) {
- Thread.sleep(100);
- }
-
- ClientRMService clientRMService = resourceManager.getClientRMService();
+ @Test (timeout = 10000)
+ public
+ void testMoveSuccessful() throws Exception {
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ RMApp app = rm1.submitApp(1024);
+ ClientRMService clientRMService = rm1.getClientRMService();
// FIFO scheduler does not support moves
- clientRMService.moveApplicationAcrossQueues(
- MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue"));
-
- RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId);
+ clientRMService
+ .moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest
+ .newInstance(app.getApplicationId(), "newqueue"));
+
+ RMApp rmApp = rm1.getRMContext().getRMApps().get(app.getApplicationId());
assertEquals("newqueue", rmApp.getQueue());
+ rm1.stop();
}
-
+
@Test
public void testMoveRejectedByPermissions() throws Exception {
failMove = true;
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Mon Jun 16 18:13:57 2014
@@ -21,15 +21,14 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.junit.Assert;
-
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -160,7 +161,7 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000)
public void testExpiredContainer() {
// Start the node
- node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(null, null));
verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
// Expire a container
@@ -188,11 +189,11 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{
//Start the node
- node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(null, null));
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
- node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+ node2.handle(new RMNodeStartedEvent(null, null));
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
@@ -248,7 +249,7 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000)
public void testStatusChange(){
//Start the node
- node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(null, null));
//Add info to the queue first
node.setNextHeartBeat(false);
@@ -464,7 +465,7 @@ public class TestRMNodeTransitions {
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
null, ResourceOption.newInstance(capability,
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
- node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
Assert.assertEquals(NodeState.RUNNING, node.getState());
return node;
}
@@ -495,7 +496,7 @@ public class TestRMNodeTransitions {
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
- node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Mon Jun 16 18:13:57 2014
@@ -30,6 +30,7 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -67,13 +68,14 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -82,8 +84,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -303,13 +305,11 @@ public class TestRMRestart {
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
- List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
- ContainerStatus containerStatus =
- BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
- .getCurrentAppAttempt().getAppAttemptId(), 1),
- ContainerState.COMPLETE, "Killed AM container", 143);
- containerStatuses.add(containerStatus);
- nm1.registerNode(containerStatuses);
+ NMContainerStatus status =
+ TestRMRestart
+ .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
+ .getAppAttemptId(), 1, ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(status));
nm2.registerNode();
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
@@ -510,14 +510,11 @@ public class TestRMRestart {
Assert.assertEquals(RMAppAttemptState.LAUNCHED,
rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
.getAppAttemptState());
-
- List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
- ContainerStatus containerStatus =
- BuilderUtils.newContainerStatus(
- BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1),
- ContainerState.COMPLETE, "Killed AM container", 143);
- containerStatuses.add(containerStatus);
- nm1.registerNode(containerStatuses);
+
+ NMContainerStatus status =
+ TestRMRestart.createNMContainerStatus(
+ am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(status));
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
launchAM(rmApp, rm2, nm1);
Assert.assertEquals(3, rmApp.getAppAttempts().size());
@@ -615,7 +612,7 @@ public class TestRMRestart {
@Override
public void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception {
+ ApplicationStateData appStateData) throws Exception {
if (count == 0) {
// do nothing; simulate app final state is not saved.
LOG.info(appId + " final state is not saved.");
@@ -763,14 +760,14 @@ public class TestRMRestart {
@Override
public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ ApplicationAttemptStateData attemptStateData) throws Exception {
// ignore attempt saving request.
}
@Override
public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ ApplicationAttemptStateData attemptStateData) throws Exception {
// ignore attempt saving request.
}
};
@@ -1678,13 +1675,12 @@ public class TestRMRestart {
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
- List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
- ContainerStatus containerStatus =
- BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
- .getCurrentAppAttempt().getAppAttemptId(), 1),
- ContainerState.COMPLETE, "Killed AM container", 143);
- containerStatuses.add(containerStatus);
- nm1.registerNode(containerStatuses);
+
+ NMContainerStatus status =
+ TestRMRestart
+ .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
+ .getAppAttemptId(), 1, ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(status));
while (loadedApp1.getAppAttempts().size() != 2) {
Thread.sleep(200);
}
@@ -1808,12 +1804,10 @@ public class TestRMRestart {
// ResourceTrackerService is started.
super.serviceStart();
nm1.setResourceTrackerService(getResourceTrackerService());
- List<ContainerStatus> status = new ArrayList<ContainerStatus>();
- ContainerId amContainer =
- ContainerId.newInstance(am0.getApplicationAttemptId(), 1);
- status.add(ContainerStatus.newInstance(amContainer,
- ContainerState.COMPLETE, "AM container exit", 143));
- nm1.registerNode(status);
+ NMContainerStatus status =
+ TestRMRestart.createNMContainerStatus(
+ am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(status));
}
};
}
@@ -1852,6 +1846,15 @@ public class TestRMRestart {
}
}
+ public static NMContainerStatus createNMContainerStatus(
+ ApplicationAttemptId appAttemptId, int id, ContainerState containerState) {
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, id);
+ NMContainerStatus containerReport =
+ NMContainerStatus.newInstance(containerId, containerState,
+ Resource.newInstance(1024, 1), "recover container", 0);
+ return containerReport;
+ }
+
public class TestMemoryRMStateStore extends MemoryRMStateStore {
int count = 0;
public int updateApp = 0;
@@ -1859,7 +1862,7 @@ public class TestRMRestart {
@Override
public void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception {
+ ApplicationStateData appStateData) throws Exception {
updateApp = ++count;
super.updateApplicationStateInternal(appId, appStateData);
}
@@ -1868,7 +1871,7 @@ public class TestRMRestart {
public synchronized void
updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData)
+ ApplicationAttemptStateData attemptStateData)
throws Exception {
updateAttempt = ++count;
super.updateApplicationAttemptStateInternal(attemptId,
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java Mon Jun 16 18:13:57 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
@@ -487,33 +488,37 @@ public class TestResourceTrackerService
RMApp app = rm.submitApp(1024, true);
// Case 1.1: AppAttemptId is null
- ContainerStatus status = ContainerStatus.newInstance(
- ContainerId.newInstance(ApplicationAttemptId.newInstance(
- app.getApplicationId(), 2), 1),
- ContainerState.COMPLETE, "Dummy Completed", 0);
- rm.getResourceTrackerService().handleContainerStatus(status);
+ NMContainerStatus report =
+ NMContainerStatus.newInstance(
+ ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
+ ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+ "Dummy Completed", 0);
+ rm.getResourceTrackerService().handleNMContainerStatus(report);
verify(handler, never()).handle((Event) any());
// Case 1.2: Master container is null
RMAppAttemptImpl currentAttempt =
(RMAppAttemptImpl) app.getCurrentAppAttempt();
currentAttempt.setMasterContainer(null);
- status = ContainerStatus.newInstance(
- ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
- ContainerState.COMPLETE, "Dummy Completed", 0);
- rm.getResourceTrackerService().handleContainerStatus(status);
+ report = NMContainerStatus.newInstance(
+ ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
+ ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+ "Dummy Completed", 0);
+ rm.getResourceTrackerService().handleNMContainerStatus(report);
verify(handler, never()).handle((Event)any());
// Case 2: Managed AM
app = rm.submitApp(1024);
// Case 2.1: AppAttemptId is null
- status = ContainerStatus.newInstance(
- ContainerId.newInstance(ApplicationAttemptId.newInstance(
- app.getApplicationId(), 2), 1),
- ContainerState.COMPLETE, "Dummy Completed", 0);
+ report = NMContainerStatus.newInstance(
+ ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
+ ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+ "Dummy Completed", 0);
try {
- rm.getResourceTrackerService().handleContainerStatus(status);
+ rm.getResourceTrackerService().handleNMContainerStatus(report);
} catch (Exception e) {
// expected - ignore
}
@@ -523,11 +528,12 @@ public class TestResourceTrackerService
currentAttempt =
(RMAppAttemptImpl) app.getCurrentAppAttempt();
currentAttempt.setMasterContainer(null);
- status = ContainerStatus.newInstance(
- ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
- ContainerState.COMPLETE, "Dummy Completed", 0);
+ report = NMContainerStatus.newInstance(
+ ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
+ ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+ "Dummy Completed", 0);
try {
- rm.getResourceTrackerService().handleContainerStatus(status);
+ rm.getResourceTrackerService().handleNMContainerStatus(report);
} catch (Exception e) {
// expected - ignore
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java Mon Jun 16 18:13:57 2014
@@ -17,6 +17,25 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
@@ -27,12 +46,16 @@ import java.util.Random;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
@@ -52,17 +75,6 @@ import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
public class TestProportionalCapacityPreemptionPolicy {
static final long TS = 3141592653L;
@@ -424,6 +436,36 @@ public class TestProportionalCapacityPre
assert containers.get(4).equals(rm5);
}
+
+ @Test
+ public void testPolicyInitializeAfterSchedulerInitialized() {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+ ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+
+ @SuppressWarnings("resource")
+ MockRM rm = new MockRM(conf);
+ rm.init(conf);
+
+ // ProportionalCapacityPreemptionPolicy should be initialized after
+ // CapacityScheduler initialized. We will
+ // 1) find SchedulingMonitor from RMActiveService's service list,
+ // 2) check if ResourceCalculator in policy is null or not.
+ // If it's not null, we can come to a conclusion that policy initialized
+ // after scheduler got initialized
+ for (Service service : rm.getRMActiveService().getServices()) {
+ if (service instanceof SchedulingMonitor) {
+ ProportionalCapacityPreemptionPolicy policy =
+ (ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service)
+ .getSchedulingEditPolicy();
+ assertNotNull(policy.getResourceCalculator());
+ return;
+ }
+ }
+
+ fail("Failed to find SchedulingMonitor service, please check what happened");
+ }
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {