You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2016/10/21 04:17:54 UTC

hadoop git commit: YARN-5047. Refactor nodeUpdate across schedulers. (Ray Chiang via kasha)

Repository: hadoop
Updated Branches:
  refs/heads/trunk a064865ab -> 754cb4e30


YARN-5047. Refactor nodeUpdate across schedulers. (Ray Chiang via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/754cb4e3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/754cb4e3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/754cb4e3

Branch: refs/heads/trunk
Commit: 754cb4e30fac1c5fe8d44626968c0ddbfe459335
Parents: a064865
Author: Karthik Kambatla <ka...@apache.org>
Authored: Thu Oct 20 21:17:48 2016 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Thu Oct 20 21:17:48 2016 -0700

----------------------------------------------------------------------
 .../scheduler/AbstractYarnScheduler.java        | 186 ++++++++++++++++++-
 .../scheduler/capacity/CapacityScheduler.java   | 122 ++----------
 .../scheduler/fair/FairScheduler.java           |  80 +-------
 .../scheduler/fifo/FifoScheduler.java           |  94 +++-------
 ...estProportionalCapacityPreemptionPolicy.java |   4 +-
 5 files changed, 225 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/754cb4e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 645e06d..df59556 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
@@ -73,7 +74,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReco
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.utils.Lock;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.SettableFuture;
@@ -94,10 +100,14 @@ public abstract class AbstractYarnScheduler
   protected Resource minimumAllocation;
 
   protected volatile RMContext rmContext;
-  
+
   private volatile Priority maxClusterLevelAppPriority;
 
   protected ActivitiesManager activitiesManager;
+  protected SchedulerHealth schedulerHealth = new SchedulerHealth();
+  protected volatile long lastNodeUpdateTime;
+
+  private volatile Clock clock;
 
   /*
    * All schedulers which are inheriting AbstractYarnScheduler should use
@@ -130,6 +140,7 @@ public abstract class AbstractYarnScheduler
    */
   public AbstractYarnScheduler(String name) {
     super(name);
+    clock = SystemClock.getInstance();
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     readLock = lock.readLock();
     writeLock = lock.writeLock();
@@ -228,13 +239,25 @@ public abstract class AbstractYarnScheduler
     nodeTracker.setConfiguredMaxAllocation(maximumAllocation);
   }
 
+  public SchedulerHealth getSchedulerHealth() {
+    return this.schedulerHealth;
+  }
+
+  protected void setLastNodeUpdateTime(long time) {
+    this.lastNodeUpdateTime = time;
+  }
+
+  public long getLastNodeUpdateTime() {
+    return lastNodeUpdateTime;
+  }
+
   protected void containerLaunchedOnNode(
       ContainerId containerId, SchedulerNode node) {
     try {
       readLock.lock();
       // Get the application for the finished container
-      SchedulerApplicationAttempt application = getCurrentAttemptForContainer(
-          containerId);
+      SchedulerApplicationAttempt application =
+          getCurrentAttemptForContainer(containerId);
       if (application == null) {
         LOG.info("Unknown application " + containerId.getApplicationAttemptId()
             .getApplicationId() + " launched container " + containerId
@@ -249,7 +272,7 @@ public abstract class AbstractYarnScheduler
       readLock.unlock();
     }
   }
-  
+
   protected void containerIncreasedOnNode(ContainerId containerId,
       SchedulerNode node, Container increasedContainerReportedByNM) {
     /*
@@ -276,6 +299,7 @@ public abstract class AbstractYarnScheduler
     }
     rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(containerId,
         increasedContainerReportedByNM.getResource()));
+
   }
 
   public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
@@ -360,7 +384,7 @@ public abstract class AbstractYarnScheduler
     }
   }
 
-  public void recoverContainersOnNode(
+  public synchronized void recoverContainersOnNode(
       List<NMContainerStatus> containerReports, RMNode nm) {
     try {
       writeLock.lock();
@@ -475,7 +499,7 @@ public abstract class AbstractYarnScheduler
   }
 
   /**
-   * Recover resource request back from RMContainer when a container is 
+   * Recover resource request back from RMContainer when a container is
    * preempted before AM pulled the same. If container is pulled by
    * AM, then RMContainer will not have resource request to recover.
    * @param rmContainer rmContainer
@@ -621,7 +645,7 @@ public abstract class AbstractYarnScheduler
       SchedulerApplicationAttempt attempt);
 
   @Override
-  public SchedulerNode getSchedulerNode(NodeId nodeId) {
+  public N getSchedulerNode(NodeId nodeId) {
     return nodeTracker.getNode(nodeId);
   }
 
@@ -832,4 +856,152 @@ public abstract class AbstractYarnScheduler
     return this.activitiesManager;
   }
 
+  public Clock getClock() {
+    return clock;
+  }
+
+  @VisibleForTesting
+  public void setClock(Clock clock) {
+    this.clock = clock;
+  }
+
+  @Lock(Lock.NoLock.class)
+  public SchedulerNode getNode(NodeId nodeId) {
+    return nodeTracker.getNode(nodeId);
+  }
+
+  /**
+   * Get lists of new containers from NodeManager and process them.
+   * @param nm The RMNode corresponding to the NodeManager
+   * @return list of completed containers
+   */
+  protected List<ContainerStatus> updateNewContainerInfo(RMNode nm) {
+    SchedulerNode node = getNode(nm.getNodeID());
+
+    List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
+    List<ContainerStatus> newlyLaunchedContainers =
+        new ArrayList<>();
+    List<ContainerStatus> completedContainers =
+        new ArrayList<>();
+
+    for(UpdatedContainerInfo containerInfo : containerInfoList) {
+      newlyLaunchedContainers
+          .addAll(containerInfo.getNewlyLaunchedContainers());
+      completedContainers.addAll(containerInfo.getCompletedContainers());
+    }
+
+    // Processing the newly launched containers
+    for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
+      containerLaunchedOnNode(launchedContainer.getContainerId(), node);
+    }
+
+    // Processing the newly increased containers
+    List<Container> newlyIncreasedContainers =
+        nm.pullNewlyIncreasedContainers();
+    for (Container container : newlyIncreasedContainers) {
+      containerIncreasedOnNode(container.getId(), node, container);
+    }
+
+    return completedContainers;
+  }
+
+  /**
+   * Process completed container list.
+   * @param completedContainers Extracted list of completed containers
+   * @param releasedResources Reference resource object for completed containers
+   * @return The total number of released containers
+   */
+  protected int updateCompletedContainers(List<ContainerStatus>
+      completedContainers, Resource releasedResources) {
+    int releasedContainers = 0;
+    for (ContainerStatus completedContainer : completedContainers) {
+      ContainerId containerId = completedContainer.getContainerId();
+      LOG.debug("Container FINISHED: " + containerId);
+      RMContainer container = getRMContainer(containerId);
+      completedContainer(getRMContainer(containerId),
+          completedContainer, RMContainerEventType.FINISHED);
+      if (container != null) {
+        releasedContainers++;
+        Resource ars = container.getAllocatedResource();
+        if (ars != null) {
+          Resources.addTo(releasedResources, ars);
+        }
+        Resource rrs = container.getReservedResource();
+        if (rrs != null) {
+          Resources.addTo(releasedResources, rrs);
+        }
+      }
+    }
+    return releasedContainers;
+  }
+
+  /**
+   * Update schedulerHealth information.
+   * @param releasedResources Reference resource object for completed containers
+   * @param releasedContainers Count of released containers
+   */
+  protected void updateSchedulerHealthInformation(Resource releasedResources,
+      int releasedContainers) {
+
+    schedulerHealth.updateSchedulerReleaseDetails(getLastNodeUpdateTime(),
+        releasedResources);
+    schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
+  }
+
+  /**
+   * Update container and utilization information on the NodeManager.
+   * @param nm The NodeManager to update
+   */
+  protected void updateNodeResourceUtilization(RMNode nm) {
+    SchedulerNode node = getNode(nm.getNodeID());
+    // Updating node resource utilization
+    node.setAggregatedContainersUtilization(
+        nm.getAggregatedContainersUtilization());
+    node.setNodeUtilization(nm.getNodeUtilization());
+
+  }
+
+  /**
+   * Process a heartbeat update from a node.
+   * @param nm The RMNode corresponding to the NodeManager
+   */
+  protected synchronized void nodeUpdate(RMNode nm) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("nodeUpdate: " + nm +
+          " cluster capacity: " + getClusterResource());
+    }
+
+    // Process new container information
+    List<ContainerStatus> completedContainers = updateNewContainerInfo(nm);
+
+    // Process completed containers
+    Resource releasedResources = Resource.newInstance(0, 0);
+    int releasedContainers = updateCompletedContainers(completedContainers,
+        releasedResources);
+
+    // If the node is decommissioning, send an update to have the total
+    // resource equal to the used resource, so no available resource to
+    // schedule.
+    // TODO YARN-5128: Fix possible race-condition when request comes in before
+    // update is propagated
+    if (nm.getState() == NodeState.DECOMMISSIONING) {
+      this.rmContext
+          .getDispatcher()
+          .getEventHandler()
+          .handle(
+              new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
+                  .newInstance(getSchedulerNode(nm.getNodeID())
+                      .getAllocatedResource(), 0)));
+    }
+
+    updateSchedulerHealthInformation(releasedResources, releasedContainers);
+    updateNodeResourceUtilization(nm);
+
+    // Now node data structures are up-to-date and ready for scheduling.
+    if(LOG.isDebugEnabled()) {
+      SchedulerNode node = getNode(nm.getNodeID());
+      LOG.debug("Node being looked for scheduling " + nm +
+          " availableResource: " + node.getUnallocatedResource());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/754cb4e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6d00bee..cfdcb10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -89,8 +88,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@@ -105,7 +102,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerCha
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
@@ -235,8 +231,6 @@ public class CapacityScheduler extends
   private boolean scheduleAsynchronously;
   private AsyncScheduleThread asyncSchedulerThread;
   private RMNodeLabelsManager labelManager;
-  private SchedulerHealth schedulerHealth = new SchedulerHealth();
-  volatile long lastNodeUpdateTime;
 
   /**
    * EXPERT
@@ -1099,93 +1093,24 @@ public class CapacityScheduler extends
     return root.getQueueUserAclInfo(user);
   }
 
-  private void nodeUpdate(RMNode nm) {
+  @Override
+  protected synchronized void nodeUpdate(RMNode nm) {
     try {
       writeLock.lock();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(
-            "nodeUpdate: " + nm + " clusterResources: " + getClusterResource());
-      }
-
-      Resource releaseResources = Resource.newInstance(0, 0);
-
-      FiCaSchedulerNode node = getNode(nm.getNodeID());
-
-      List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
-      List<ContainerStatus> newlyLaunchedContainers =
-          new ArrayList<ContainerStatus>();
-      List<ContainerStatus> completedContainers =
-          new ArrayList<ContainerStatus>();
-      for (UpdatedContainerInfo containerInfo : containerInfoList) {
-        newlyLaunchedContainers.addAll(
-            containerInfo.getNewlyLaunchedContainers());
-        completedContainers.addAll(containerInfo.getCompletedContainers());
-      }
-
-      // Processing the newly launched containers
-      for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
-        containerLaunchedOnNode(launchedContainer.getContainerId(), node);
-      }
-
-      // Processing the newly increased containers
-      List<Container> newlyIncreasedContainers =
-          nm.pullNewlyIncreasedContainers();
-      for (Container container : newlyIncreasedContainers) {
-        containerIncreasedOnNode(container.getId(), node, container);
-      }
-
-      // Process completed containers
-      int releasedContainers = 0;
-      for (ContainerStatus completedContainer : completedContainers) {
-        ContainerId containerId = completedContainer.getContainerId();
-        RMContainer container = getRMContainer(containerId);
-        super.completedContainer(container, completedContainer,
-            RMContainerEventType.FINISHED);
-        if (container != null) {
-          releasedContainers++;
-          Resource rs = container.getAllocatedResource();
-          if (rs != null) {
-            Resources.addTo(releaseResources, rs);
-          }
-          rs = container.getReservedResource();
-          if (rs != null) {
-            Resources.addTo(releaseResources, rs);
-          }
-        }
-      }
-
-      // If the node is decommissioning, send an update to have the total
-      // resource equal to the used resource, so no available resource to
-      // schedule.
-      // TODO: Fix possible race-condition when request comes in before
-      // update is propagated
-      if (nm.getState() == NodeState.DECOMMISSIONING) {
-        this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
-                .newInstance(
-                    getSchedulerNode(nm.getNodeID()).getAllocatedResource(),
-                    0)));
-      }
-      schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime,
-          releaseResources);
-      schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
-
-      // Updating node resource utilization
-      node.setAggregatedContainersUtilization(
-          nm.getAggregatedContainersUtilization());
-      node.setNodeUtilization(nm.getNodeUtilization());
-
-      // Now node data structures are upto date and ready for scheduling.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(
-            "Node being looked for scheduling " + nm + " availableResource: "
-                + node.getUnallocatedResource());
+      setLastNodeUpdateTime(Time.now());
+      super.nodeUpdate(nm);
+      if (!scheduleAsynchronously) {
+        ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
+            nm.getNodeID());
+        allocateContainersToNode(getNode(nm.getNodeID()));
+        ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
+            nm.getNodeID());
       }
     } finally {
       writeLock.unlock();
     }
   }
-  
+
   /**
    * Process resource update on a node.
    */
@@ -1458,16 +1383,7 @@ public class CapacityScheduler extends
     case NODE_UPDATE:
     {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
-      RMNode node = nodeUpdatedEvent.getRMNode();
-      setLastNodeUpdateTime(Time.now());
-      nodeUpdate(node);
-      if (!scheduleAsynchronously) {
-        ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
-            node.getNodeID());
-        allocateContainersToNode(getNode(node.getNodeID()));
-        ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
-            node.getNodeID());
-      }
+      nodeUpdate(nodeUpdatedEvent.getRMNode());
     }
     break;
     case APP_ADDED:
@@ -2194,20 +2110,6 @@ public class CapacityScheduler extends
   }
 
   @Override
-  public SchedulerHealth getSchedulerHealth() {
-    return this.schedulerHealth;
-  }
-
-  private void setLastNodeUpdateTime(long time) {
-    this.lastNodeUpdateTime = time;
-  }
-
-  @Override
-  public long getLastNodeUpdateTime() {
-    return lastNodeUpdateTime;
-  }
-
-  @Override
   public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
       String user, String queueName, ApplicationId applicationId)
       throws YarnException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/754cb4e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index d33c214..94fdb7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@@ -70,8 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -92,8 +89,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourc
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -130,7 +125,6 @@ public class FairScheduler extends
 
   private Resource incrAllocation;
   private QueueManager queueMgr;
-  private volatile Clock clock;
   private boolean usePortForNodeName;
 
   private static final Log LOG = LogFactory.getLog(FairScheduler.class);
@@ -217,7 +211,6 @@ public class FairScheduler extends
 
   public FairScheduler() {
     super(FairScheduler.class.getName());
-    clock = SystemClock.getInstance();
     allocsLoader = new AllocationFileLoaderService();
     queueMgr = new QueueManager(this);
     maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
@@ -383,7 +376,7 @@ public class FairScheduler extends
    * threshold for each type of task.
    */
   private void updateStarvationStats() {
-    lastPreemptionUpdateTime = clock.getTime();
+    lastPreemptionUpdateTime = getClock().getTime();
     for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
       sched.updateStarvationStats();
     }
@@ -616,15 +609,6 @@ public class FairScheduler extends
     return continuousSchedulingSleepMs;
   }
 
-  public Clock getClock() {
-    return clock;
-  }
-
-  @VisibleForTesting
-  void setClock(Clock clock) {
-    this.clock = clock;
-  }
-
   public FairSchedulerEventLog getEventLog() {
     return eventLog;
   }
@@ -1053,67 +1037,17 @@ public class FairScheduler extends
         preemptionContainerIds, null, null,
         application.pullUpdatedNMTokens());
   }
-  
-  /**
-   * Process a heartbeat update from a node.
-   */
-  private void nodeUpdate(RMNode nm) {
+
+  @Override
+  protected synchronized void nodeUpdate(RMNode nm) {
     try {
       writeLock.lock();
       long start = getClock().getTime();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(
-            "nodeUpdate: " + nm + " cluster capacity: " + getClusterResource());
-      }
       eventLog.log("HEARTBEAT", nm.getHostName());
-      FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
-
-      List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
-      List<ContainerStatus> newlyLaunchedContainers =
-          new ArrayList<ContainerStatus>();
-      List<ContainerStatus> completedContainers =
-          new ArrayList<ContainerStatus>();
-      for (UpdatedContainerInfo containerInfo : containerInfoList) {
-        newlyLaunchedContainers.addAll(
-            containerInfo.getNewlyLaunchedContainers());
-        completedContainers.addAll(containerInfo.getCompletedContainers());
-      }
-      // Processing the newly launched containers
-      for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
-        containerLaunchedOnNode(launchedContainer.getContainerId(), node);
-      }
-
-      // Process completed containers
-      for (ContainerStatus completedContainer : completedContainers) {
-        ContainerId containerId = completedContainer.getContainerId();
-        LOG.debug("Container FINISHED: " + containerId);
-        super.completedContainer(getRMContainer(containerId),
-            completedContainer, RMContainerEventType.FINISHED);
-      }
-
-      // If the node is decommissioning, send an update to have the total
-      // resource equal to the used resource, so no available resource to
-      // schedule.
-      if (nm.getState() == NodeState.DECOMMISSIONING) {
-        this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
-                .newInstance(
-                    getSchedulerNode(nm.getNodeID()).getAllocatedResource(),
-                    0)));
-      }
-
-      if (continuousSchedulingEnabled) {
-        if (!completedContainers.isEmpty()) {
-          attemptScheduling(node);
-        }
-      } else{
-        attemptScheduling(node);
-      }
+      super.nodeUpdate(nm);
 
-      // Updating node resource utilization
-      node.setAggregatedContainersUtilization(
-          nm.getAggregatedContainersUtilization());
-      node.setNodeUtilization(nm.getNodeUtilization());
+      FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID());
+      attemptScheduling(fsNode);
 
       long duration = getClock().getTime() - start;
       fsOpDurations.addNodeUpdateDuration(duration);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/754cb4e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index e9ffd09..92acf75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -42,14 +42,12 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -69,8 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -385,10 +381,6 @@ public class FifoScheduler extends
     }
   }
 
-  private FiCaSchedulerNode getNode(NodeId nodeId) {
-    return nodeTracker.getNode(nodeId);
-  }
-
   @VisibleForTesting
   public synchronized void addApplication(ApplicationId applicationId,
       String queue, String user, boolean isAppRecovering) {
@@ -733,66 +725,6 @@ public class FifoScheduler extends
     return assignedContainers;
   }
 
-  private synchronized void nodeUpdate(RMNode rmNode) {
-    FiCaSchedulerNode node = getNode(rmNode.getNodeID());
-    
-    List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
-    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
-    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
-    for(UpdatedContainerInfo containerInfo : containerInfoList) {
-      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
-      completedContainers.addAll(containerInfo.getCompletedContainers());
-    }
-    // Processing the newly launched containers
-    for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
-      containerLaunchedOnNode(launchedContainer.getContainerId(), node);
-    }
-
-    // Process completed containers
-    for (ContainerStatus completedContainer : completedContainers) {
-      ContainerId containerId = completedContainer.getContainerId();
-      LOG.debug("Container FINISHED: " + containerId);
-      super.completedContainer(getRMContainer(containerId),
-          completedContainer, RMContainerEventType.FINISHED);
-    }
-
-    // Updating node resource utilization
-    node.setAggregatedContainersUtilization(
-        rmNode.getAggregatedContainersUtilization());
-    node.setNodeUtilization(rmNode.getNodeUtilization());
-
-    // If the node is decommissioning, send an update to have the total
-    // resource equal to the used resource, so no available resource to
-    // schedule.
-    if (rmNode.getState() == NodeState.DECOMMISSIONING) {
-      this.rmContext
-          .getDispatcher()
-          .getEventHandler()
-          .handle(
-              new RMNodeResourceUpdateEvent(rmNode.getNodeID(), ResourceOption
-                  .newInstance(getSchedulerNode(rmNode.getNodeID())
-                      .getAllocatedResource(), 0)));
-    }
-
-    if (rmContext.isWorkPreservingRecoveryEnabled()
-        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
-      return;
-    }
-
-    if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
-            node.getUnallocatedResource(), minimumAllocation)) {
-      LOG.debug("Node heartbeat " + rmNode.getNodeID() + 
-          " available resource = " + node.getUnallocatedResource());
-
-      assignContainers(node);
-
-      LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
-          + node.getUnallocatedResource());
-    }
-
-    updateAvailableResourcesMetrics();
-  }
-
   private void increaseUsedResources(RMContainer rmContainer) {
     Resources.addTo(usedResource, rmContainer.getAllocatedResource());
   }
@@ -910,7 +842,7 @@ public class FifoScheduler extends
         container.getId().getApplicationAttemptId().getApplicationId();
     
     // Get the node on which the container was allocated
-    FiCaSchedulerNode node = getNode(container.getNodeId());
+    FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(container.getNodeId());
     
     if (application == null) {
       LOG.info("Unknown application: " + appId + 
@@ -1025,4 +957,28 @@ public class FifoScheduler extends
     // TODO Auto-generated method stub
     
   }
+
+  @Override
+  protected synchronized void nodeUpdate(RMNode nm) {
+    super.nodeUpdate(nm);
+
+    FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(nm.getNodeID());
+    if (rmContext.isWorkPreservingRecoveryEnabled()
+        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
+      return;
+    }
+
+    if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
+        node.getUnallocatedResource(), minimumAllocation)) {
+      LOG.debug("Node heartbeat " + nm.getNodeID() +
+          " available resource = " + node.getUnallocatedResource());
+
+      assignContainers(node);
+
+      LOG.debug("Node after allocation " + nm.getNodeID() + " resource = "
+          + node.getUnallocatedResource());
+    }
+
+    updateAvailableResourcesMetrics();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/754cb4e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index a115aac..b6329b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler
     .SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@@ -46,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQu
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@@ -1061,7 +1061,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
         clusterResources);
 
-    SchedulerNode mNode = mock(SchedulerNode.class);
+    FiCaSchedulerNode mNode = mock(FiCaSchedulerNode.class);
     when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL);
     when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org