You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2016/10/21 04:17:54 UTC
hadoop git commit: YARN-5047. Refactor nodeUpdate across schedulers.
(Ray Chiang via kasha)
Repository: hadoop
Updated Branches:
refs/heads/trunk a064865ab -> 754cb4e30
YARN-5047. Refactor nodeUpdate across schedulers. (Ray Chiang via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/754cb4e3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/754cb4e3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/754cb4e3
Branch: refs/heads/trunk
Commit: 754cb4e30fac1c5fe8d44626968c0ddbfe459335
Parents: a064865
Author: Karthik Kambatla <ka...@apache.org>
Authored: Thu Oct 20 21:17:48 2016 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Thu Oct 20 21:17:48 2016 -0700
----------------------------------------------------------------------
.../scheduler/AbstractYarnScheduler.java | 186 ++++++++++++++++++-
.../scheduler/capacity/CapacityScheduler.java | 122 ++----------
.../scheduler/fair/FairScheduler.java | 80 +-------
.../scheduler/fifo/FifoScheduler.java | 94 +++-------
...estProportionalCapacityPreemptionPolicy.java | 4 +-
5 files changed, 225 insertions(+), 261 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/754cb4e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 645e06d..df59556 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
@@ -73,7 +74,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReco
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.utils.Lock;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
@@ -94,10 +100,14 @@ public abstract class AbstractYarnScheduler
protected Resource minimumAllocation;
protected volatile RMContext rmContext;
-
+
private volatile Priority maxClusterLevelAppPriority;
protected ActivitiesManager activitiesManager;
+ protected SchedulerHealth schedulerHealth = new SchedulerHealth();
+ protected volatile long lastNodeUpdateTime;
+
+ private volatile Clock clock;
/*
* All schedulers which are inheriting AbstractYarnScheduler should use
@@ -130,6 +140,7 @@ public abstract class AbstractYarnScheduler
*/
public AbstractYarnScheduler(String name) {
super(name);
+ clock = SystemClock.getInstance();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
@@ -228,13 +239,25 @@ public abstract class AbstractYarnScheduler
nodeTracker.setConfiguredMaxAllocation(maximumAllocation);
}
+ public SchedulerHealth getSchedulerHealth() {
+ return this.schedulerHealth;
+ }
+
+ protected void setLastNodeUpdateTime(long time) {
+ this.lastNodeUpdateTime = time;
+ }
+
+ public long getLastNodeUpdateTime() {
+ return lastNodeUpdateTime;
+ }
+
protected void containerLaunchedOnNode(
ContainerId containerId, SchedulerNode node) {
try {
readLock.lock();
// Get the application for the finished container
- SchedulerApplicationAttempt application = getCurrentAttemptForContainer(
- containerId);
+ SchedulerApplicationAttempt application =
+ getCurrentAttemptForContainer(containerId);
if (application == null) {
LOG.info("Unknown application " + containerId.getApplicationAttemptId()
.getApplicationId() + " launched container " + containerId
@@ -249,7 +272,7 @@ public abstract class AbstractYarnScheduler
readLock.unlock();
}
}
-
+
protected void containerIncreasedOnNode(ContainerId containerId,
SchedulerNode node, Container increasedContainerReportedByNM) {
/*
@@ -276,6 +299,7 @@ public abstract class AbstractYarnScheduler
}
rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(containerId,
increasedContainerReportedByNM.getResource()));
+
}
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
@@ -360,7 +384,7 @@ public abstract class AbstractYarnScheduler
}
}
- public void recoverContainersOnNode(
+ public synchronized void recoverContainersOnNode(
List<NMContainerStatus> containerReports, RMNode nm) {
try {
writeLock.lock();
@@ -475,7 +499,7 @@ public abstract class AbstractYarnScheduler
}
/**
- * Recover resource request back from RMContainer when a container is
+ * Recover resource request back from RMContainer when a container is
* preempted before AM pulled the same. If container is pulled by
* AM, then RMContainer will not have resource request to recover.
* @param rmContainer rmContainer
@@ -621,7 +645,7 @@ public abstract class AbstractYarnScheduler
SchedulerApplicationAttempt attempt);
@Override
- public SchedulerNode getSchedulerNode(NodeId nodeId) {
+ public N getSchedulerNode(NodeId nodeId) {
return nodeTracker.getNode(nodeId);
}
@@ -832,4 +856,152 @@ public abstract class AbstractYarnScheduler
return this.activitiesManager;
}
+ public Clock getClock() {
+ return clock;
+ }
+
+ @VisibleForTesting
+ public void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
+ @Lock(Lock.NoLock.class)
+ public SchedulerNode getNode(NodeId nodeId) {
+ return nodeTracker.getNode(nodeId);
+ }
+
+ /**
+ * Get lists of new containers from NodeManager and process them.
+ * @param nm The RMNode corresponding to the NodeManager
+ * @return list of completed containers
+ */
+ protected List<ContainerStatus> updateNewContainerInfo(RMNode nm) {
+ SchedulerNode node = getNode(nm.getNodeID());
+
+ List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
+ List<ContainerStatus> newlyLaunchedContainers =
+ new ArrayList<>();
+ List<ContainerStatus> completedContainers =
+ new ArrayList<>();
+
+ for(UpdatedContainerInfo containerInfo : containerInfoList) {
+ newlyLaunchedContainers
+ .addAll(containerInfo.getNewlyLaunchedContainers());
+ completedContainers.addAll(containerInfo.getCompletedContainers());
+ }
+
+ // Processing the newly launched containers
+ for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
+ containerLaunchedOnNode(launchedContainer.getContainerId(), node);
+ }
+
+ // Processing the newly increased containers
+ List<Container> newlyIncreasedContainers =
+ nm.pullNewlyIncreasedContainers();
+ for (Container container : newlyIncreasedContainers) {
+ containerIncreasedOnNode(container.getId(), node, container);
+ }
+
+ return completedContainers;
+ }
+
+ /**
+ * Process completed container list.
+ * @param completedContainers Extracted list of completed containers
+ * @param releasedResources Reference resource object for completed containers
+ * @return The total number of released containers
+ */
+ protected int updateCompletedContainers(List<ContainerStatus>
+ completedContainers, Resource releasedResources) {
+ int releasedContainers = 0;
+ for (ContainerStatus completedContainer : completedContainers) {
+ ContainerId containerId = completedContainer.getContainerId();
+ LOG.debug("Container FINISHED: " + containerId);
+ RMContainer container = getRMContainer(containerId);
+ completedContainer(getRMContainer(containerId),
+ completedContainer, RMContainerEventType.FINISHED);
+ if (container != null) {
+ releasedContainers++;
+ Resource ars = container.getAllocatedResource();
+ if (ars != null) {
+ Resources.addTo(releasedResources, ars);
+ }
+ Resource rrs = container.getReservedResource();
+ if (rrs != null) {
+ Resources.addTo(releasedResources, rrs);
+ }
+ }
+ }
+ return releasedContainers;
+ }
+
+ /**
+ * Update schedulerHealth information.
+ * @param releasedResources Reference resource object for completed containers
+ * @param releasedContainers Count of released containers
+ */
+ protected void updateSchedulerHealthInformation(Resource releasedResources,
+ int releasedContainers) {
+
+ schedulerHealth.updateSchedulerReleaseDetails(getLastNodeUpdateTime(),
+ releasedResources);
+ schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
+ }
+
+ /**
+ * Update container and utilization information on the NodeManager.
+ * @param nm The NodeManager to update
+ */
+ protected void updateNodeResourceUtilization(RMNode nm) {
+ SchedulerNode node = getNode(nm.getNodeID());
+ // Updating node resource utilization
+ node.setAggregatedContainersUtilization(
+ nm.getAggregatedContainersUtilization());
+ node.setNodeUtilization(nm.getNodeUtilization());
+
+ }
+
+ /**
+ * Process a heartbeat update from a node.
+ * @param nm The RMNode corresponding to the NodeManager
+ */
+ protected synchronized void nodeUpdate(RMNode nm) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("nodeUpdate: " + nm +
+ " cluster capacity: " + getClusterResource());
+ }
+
+ // Process new container information
+ List<ContainerStatus> completedContainers = updateNewContainerInfo(nm);
+
+ // Process completed containers
+ Resource releasedResources = Resource.newInstance(0, 0);
+ int releasedContainers = updateCompletedContainers(completedContainers,
+ releasedResources);
+
+ // If the node is decommissioning, send an update to have the total
+ // resource equal to the used resource, so no available resource to
+ // schedule.
+ // TODO YARN-5128: Fix possible race-condition when request comes in before
+ // update is propagated
+ if (nm.getState() == NodeState.DECOMMISSIONING) {
+ this.rmContext
+ .getDispatcher()
+ .getEventHandler()
+ .handle(
+ new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
+ .newInstance(getSchedulerNode(nm.getNodeID())
+ .getAllocatedResource(), 0)));
+ }
+
+ updateSchedulerHealthInformation(releasedResources, releasedContainers);
+ updateNodeResourceUtilization(nm);
+
+ // Now node data structures are up-to-date and ready for scheduling.
+ if(LOG.isDebugEnabled()) {
+ SchedulerNode node = getNode(nm.getNodeID());
+ LOG.debug("Node being looked for scheduling " + nm +
+ " availableResource: " + node.getUnallocatedResource());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/754cb4e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6d00bee..cfdcb10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -89,8 +88,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@@ -105,7 +102,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerCha
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
@@ -235,8 +231,6 @@ public class CapacityScheduler extends
private boolean scheduleAsynchronously;
private AsyncScheduleThread asyncSchedulerThread;
private RMNodeLabelsManager labelManager;
- private SchedulerHealth schedulerHealth = new SchedulerHealth();
- volatile long lastNodeUpdateTime;
/**
* EXPERT
@@ -1099,93 +1093,24 @@ public class CapacityScheduler extends
return root.getQueueUserAclInfo(user);
}
- private void nodeUpdate(RMNode nm) {
+ @Override
+ protected synchronized void nodeUpdate(RMNode nm) {
try {
writeLock.lock();
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "nodeUpdate: " + nm + " clusterResources: " + getClusterResource());
- }
-
- Resource releaseResources = Resource.newInstance(0, 0);
-
- FiCaSchedulerNode node = getNode(nm.getNodeID());
-
- List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
- List<ContainerStatus> newlyLaunchedContainers =
- new ArrayList<ContainerStatus>();
- List<ContainerStatus> completedContainers =
- new ArrayList<ContainerStatus>();
- for (UpdatedContainerInfo containerInfo : containerInfoList) {
- newlyLaunchedContainers.addAll(
- containerInfo.getNewlyLaunchedContainers());
- completedContainers.addAll(containerInfo.getCompletedContainers());
- }
-
- // Processing the newly launched containers
- for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
- containerLaunchedOnNode(launchedContainer.getContainerId(), node);
- }
-
- // Processing the newly increased containers
- List<Container> newlyIncreasedContainers =
- nm.pullNewlyIncreasedContainers();
- for (Container container : newlyIncreasedContainers) {
- containerIncreasedOnNode(container.getId(), node, container);
- }
-
- // Process completed containers
- int releasedContainers = 0;
- for (ContainerStatus completedContainer : completedContainers) {
- ContainerId containerId = completedContainer.getContainerId();
- RMContainer container = getRMContainer(containerId);
- super.completedContainer(container, completedContainer,
- RMContainerEventType.FINISHED);
- if (container != null) {
- releasedContainers++;
- Resource rs = container.getAllocatedResource();
- if (rs != null) {
- Resources.addTo(releaseResources, rs);
- }
- rs = container.getReservedResource();
- if (rs != null) {
- Resources.addTo(releaseResources, rs);
- }
- }
- }
-
- // If the node is decommissioning, send an update to have the total
- // resource equal to the used resource, so no available resource to
- // schedule.
- // TODO: Fix possible race-condition when request comes in before
- // update is propagated
- if (nm.getState() == NodeState.DECOMMISSIONING) {
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
- .newInstance(
- getSchedulerNode(nm.getNodeID()).getAllocatedResource(),
- 0)));
- }
- schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime,
- releaseResources);
- schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
-
- // Updating node resource utilization
- node.setAggregatedContainersUtilization(
- nm.getAggregatedContainersUtilization());
- node.setNodeUtilization(nm.getNodeUtilization());
-
- // Now node data structures are upto date and ready for scheduling.
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Node being looked for scheduling " + nm + " availableResource: "
- + node.getUnallocatedResource());
+ setLastNodeUpdateTime(Time.now());
+ super.nodeUpdate(nm);
+ if (!scheduleAsynchronously) {
+ ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
+ nm.getNodeID());
+ allocateContainersToNode(getNode(nm.getNodeID()));
+ ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
+ nm.getNodeID());
}
} finally {
writeLock.unlock();
}
}
-
+
/**
* Process resource update on a node.
*/
@@ -1458,16 +1383,7 @@ public class CapacityScheduler extends
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
- RMNode node = nodeUpdatedEvent.getRMNode();
- setLastNodeUpdateTime(Time.now());
- nodeUpdate(node);
- if (!scheduleAsynchronously) {
- ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
- node.getNodeID());
- allocateContainersToNode(getNode(node.getNodeID()));
- ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
- node.getNodeID());
- }
+ nodeUpdate(nodeUpdatedEvent.getRMNode());
}
break;
case APP_ADDED:
@@ -2194,20 +2110,6 @@ public class CapacityScheduler extends
}
@Override
- public SchedulerHealth getSchedulerHealth() {
- return this.schedulerHealth;
- }
-
- private void setLastNodeUpdateTime(long time) {
- this.lastNodeUpdateTime = time;
- }
-
- @Override
- public long getLastNodeUpdateTime() {
- return lastNodeUpdateTime;
- }
-
- @Override
public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
String user, String queueName, ApplicationId applicationId)
throws YarnException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/754cb4e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index d33c214..94fdb7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@@ -70,8 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -92,8 +89,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourc
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -130,7 +125,6 @@ public class FairScheduler extends
private Resource incrAllocation;
private QueueManager queueMgr;
- private volatile Clock clock;
private boolean usePortForNodeName;
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
@@ -217,7 +211,6 @@ public class FairScheduler extends
public FairScheduler() {
super(FairScheduler.class.getName());
- clock = SystemClock.getInstance();
allocsLoader = new AllocationFileLoaderService();
queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
@@ -383,7 +376,7 @@ public class FairScheduler extends
* threshold for each type of task.
*/
private void updateStarvationStats() {
- lastPreemptionUpdateTime = clock.getTime();
+ lastPreemptionUpdateTime = getClock().getTime();
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
sched.updateStarvationStats();
}
@@ -616,15 +609,6 @@ public class FairScheduler extends
return continuousSchedulingSleepMs;
}
- public Clock getClock() {
- return clock;
- }
-
- @VisibleForTesting
- void setClock(Clock clock) {
- this.clock = clock;
- }
-
public FairSchedulerEventLog getEventLog() {
return eventLog;
}
@@ -1053,67 +1037,17 @@ public class FairScheduler extends
preemptionContainerIds, null, null,
application.pullUpdatedNMTokens());
}
-
- /**
- * Process a heartbeat update from a node.
- */
- private void nodeUpdate(RMNode nm) {
+
+ @Override
+ protected synchronized void nodeUpdate(RMNode nm) {
try {
writeLock.lock();
long start = getClock().getTime();
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "nodeUpdate: " + nm + " cluster capacity: " + getClusterResource());
- }
eventLog.log("HEARTBEAT", nm.getHostName());
- FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
-
- List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
- List<ContainerStatus> newlyLaunchedContainers =
- new ArrayList<ContainerStatus>();
- List<ContainerStatus> completedContainers =
- new ArrayList<ContainerStatus>();
- for (UpdatedContainerInfo containerInfo : containerInfoList) {
- newlyLaunchedContainers.addAll(
- containerInfo.getNewlyLaunchedContainers());
- completedContainers.addAll(containerInfo.getCompletedContainers());
- }
- // Processing the newly launched containers
- for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
- containerLaunchedOnNode(launchedContainer.getContainerId(), node);
- }
-
- // Process completed containers
- for (ContainerStatus completedContainer : completedContainers) {
- ContainerId containerId = completedContainer.getContainerId();
- LOG.debug("Container FINISHED: " + containerId);
- super.completedContainer(getRMContainer(containerId),
- completedContainer, RMContainerEventType.FINISHED);
- }
-
- // If the node is decommissioning, send an update to have the total
- // resource equal to the used resource, so no available resource to
- // schedule.
- if (nm.getState() == NodeState.DECOMMISSIONING) {
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
- .newInstance(
- getSchedulerNode(nm.getNodeID()).getAllocatedResource(),
- 0)));
- }
-
- if (continuousSchedulingEnabled) {
- if (!completedContainers.isEmpty()) {
- attemptScheduling(node);
- }
- } else{
- attemptScheduling(node);
- }
+ super.nodeUpdate(nm);
- // Updating node resource utilization
- node.setAggregatedContainersUtilization(
- nm.getAggregatedContainersUtilization());
- node.setNodeUtilization(nm.getNodeUtilization());
+ FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID());
+ attemptScheduling(fsNode);
long duration = getClock().getTime() - start;
fsOpDurations.addNodeUpdateDuration(duration);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/754cb4e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index e9ffd09..92acf75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -42,14 +42,12 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -69,8 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -385,10 +381,6 @@ public class FifoScheduler extends
}
}
- private FiCaSchedulerNode getNode(NodeId nodeId) {
- return nodeTracker.getNode(nodeId);
- }
-
@VisibleForTesting
public synchronized void addApplication(ApplicationId applicationId,
String queue, String user, boolean isAppRecovering) {
@@ -733,66 +725,6 @@ public class FifoScheduler extends
return assignedContainers;
}
- private synchronized void nodeUpdate(RMNode rmNode) {
- FiCaSchedulerNode node = getNode(rmNode.getNodeID());
-
- List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
- List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
- List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
- for(UpdatedContainerInfo containerInfo : containerInfoList) {
- newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
- completedContainers.addAll(containerInfo.getCompletedContainers());
- }
- // Processing the newly launched containers
- for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
- containerLaunchedOnNode(launchedContainer.getContainerId(), node);
- }
-
- // Process completed containers
- for (ContainerStatus completedContainer : completedContainers) {
- ContainerId containerId = completedContainer.getContainerId();
- LOG.debug("Container FINISHED: " + containerId);
- super.completedContainer(getRMContainer(containerId),
- completedContainer, RMContainerEventType.FINISHED);
- }
-
- // Updating node resource utilization
- node.setAggregatedContainersUtilization(
- rmNode.getAggregatedContainersUtilization());
- node.setNodeUtilization(rmNode.getNodeUtilization());
-
- // If the node is decommissioning, send an update to have the total
- // resource equal to the used resource, so no available resource to
- // schedule.
- if (rmNode.getState() == NodeState.DECOMMISSIONING) {
- this.rmContext
- .getDispatcher()
- .getEventHandler()
- .handle(
- new RMNodeResourceUpdateEvent(rmNode.getNodeID(), ResourceOption
- .newInstance(getSchedulerNode(rmNode.getNodeID())
- .getAllocatedResource(), 0)));
- }
-
- if (rmContext.isWorkPreservingRecoveryEnabled()
- && !rmContext.isSchedulerReadyForAllocatingContainers()) {
- return;
- }
-
- if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
- node.getUnallocatedResource(), minimumAllocation)) {
- LOG.debug("Node heartbeat " + rmNode.getNodeID() +
- " available resource = " + node.getUnallocatedResource());
-
- assignContainers(node);
-
- LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
- + node.getUnallocatedResource());
- }
-
- updateAvailableResourcesMetrics();
- }
-
private void increaseUsedResources(RMContainer rmContainer) {
Resources.addTo(usedResource, rmContainer.getAllocatedResource());
}
@@ -910,7 +842,7 @@ public class FifoScheduler extends
container.getId().getApplicationAttemptId().getApplicationId();
// Get the node on which the container was allocated
- FiCaSchedulerNode node = getNode(container.getNodeId());
+ FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(container.getNodeId());
if (application == null) {
LOG.info("Unknown application: " + appId +
@@ -1025,4 +957,28 @@ public class FifoScheduler extends
// TODO Auto-generated method stub
}
+
+ @Override
+ protected synchronized void nodeUpdate(RMNode nm) {
+ super.nodeUpdate(nm);
+
+ FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(nm.getNodeID());
+ if (rmContext.isWorkPreservingRecoveryEnabled()
+ && !rmContext.isSchedulerReadyForAllocatingContainers()) {
+ return;
+ }
+
+ if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
+ node.getUnallocatedResource(), minimumAllocation)) {
+ LOG.debug("Node heartbeat " + nm.getNodeID() +
+ " available resource = " + node.getUnallocatedResource());
+
+ assignContainers(node);
+
+ LOG.debug("Node after allocation " + nm.getNodeID() + " resource = "
+ + node.getUnallocatedResource());
+ }
+
+ updateAvailableResourcesMetrics();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/754cb4e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index a115aac..b6329b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@@ -46,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQu
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@@ -1061,7 +1061,7 @@ public class TestProportionalCapacityPreemptionPolicy {
when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
clusterResources);
- SchedulerNode mNode = mock(SchedulerNode.class);
+ FiCaSchedulerNode mNode = mock(FiCaSchedulerNode.class);
when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL);
when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org