You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ss...@apache.org on 2013/02/26 04:33:01 UTC

svn commit: r1450008 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/ hadoop-yarn/hadoop-yarn-server/hado...

Author: sseth
Date: Tue Feb 26 03:33:01 2013
New Revision: 1450008

URL: http://svn.apache.org/r1450008
Log:
merge YARN-365 from trunk. Change NM heartbeat handling to not generate a scheduler event on each heartbeat. Contributed by Xuan Gong.

Added:
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/UpdatedContainerInfo.java
      - copied unchanged from r1450007, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/UpdatedContainerInfo.java
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1450008&r1=1450007&r2=1450008&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Tue Feb 26 03:33:01 2013
@@ -8,6 +8,9 @@ Release 2.0.4-beta - UNRELEASED
 
   IMPROVEMENTS
 
+    YARN-365. Change NM heartbeat handling to not generate a scheduler event
+    on each heartbeat. (Xuan Gong via sseth)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java?rev=1450008&r1=1450007&r2=1450008&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java Tue Feb 26 03:33:01 2013
@@ -106,4 +106,13 @@ public interface RMNode {
   public List<ApplicationId> getAppsToCleanup();
 
   public HeartbeatResponse getLastHeartBeatResponse();
+  
+  /**
+   * Get and clear the list of containerUpdates accumulated across NM
+   * heartbeats.
+   * 
+   * @return containerUpdates accumulated across NM heartbeats.
+   */
+  public List<UpdatedContainerInfo> pullContainerUpdates();
+  
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1450008&r1=1450007&r2=1450008&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Tue Feb 26 03:33:01 2013
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -60,6 +61,8 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.BuilderUtils.ContainerIdComparator;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is used to keep track of all the applications/containers
  * running on a node.
@@ -78,6 +81,9 @@ public class RMNodeImpl implements RMNod
   private final ReadLock readLock;
   private final WriteLock writeLock;
 
+  private final ConcurrentLinkedQueue<UpdatedContainerInfo> nodeUpdateQueue;
+  private volatile boolean nextHeartBeat = true;
+
   private final NodeId nodeId;
   private final RMContext context;
   private final String hostName;
@@ -186,6 +192,7 @@ public class RMNodeImpl implements RMNod
 
     this.stateMachine = stateMachineFactory.make(this);
     
+    this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();  
   }
 
   @Override
@@ -400,6 +407,7 @@ public class RMNodeImpl implements RMNod
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
       // Kill containers since node is rejoining.
+      rmNode.nodeUpdateQueue.clear();
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodeRemovedSchedulerEvent(rmNode));
 
@@ -458,6 +466,7 @@ public class RMNodeImpl implements RMNod
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
       // Inform the scheduler
+      rmNode.nodeUpdateQueue.clear();
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodeRemovedSchedulerEvent(rmNode));
       rmNode.context.getDispatcher().getEventHandler().handle(
@@ -489,6 +498,7 @@ public class RMNodeImpl implements RMNod
           statusEvent.getNodeHealthStatus();
       rmNode.setNodeHealthStatus(remoteNodeHealthStatus);
       if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
+        rmNode.nodeUpdateQueue.clear();
         // Inform the scheduler
         rmNode.context.getDispatcher().getEventHandler().handle(
             new NodeRemovedSchedulerEvent(rmNode));
@@ -538,10 +548,16 @@ public class RMNodeImpl implements RMNod
           completedContainers.add(remoteContainer);
         }
       }
-
-      rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers, 
-              completedContainers));
+      if(newlyLaunchedContainers.size() != 0 
+          || completedContainers.size() != 0) {
+        rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo
+            (newlyLaunchedContainers, completedContainers));
+      }
+      if(rmNode.nextHeartBeat) {
+        rmNode.nextHeartBeat = false;
+        rmNode.context.getDispatcher().getEventHandler().handle(
+            new NodeUpdateSchedulerEvent(rmNode));
+      }
       
       rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
           statusEvent.getKeepAliveAppIds());
@@ -584,4 +600,25 @@ public class RMNodeImpl implements RMNod
       return NodeState.UNHEALTHY;
     }
   }
+
+  @Override
+  public List<UpdatedContainerInfo> pullContainerUpdates() {
+    List<UpdatedContainerInfo> latestContainerInfoList = 
+        new ArrayList<UpdatedContainerInfo>();
+    while(nodeUpdateQueue.peek() != null){
+      latestContainerInfoList.add(nodeUpdateQueue.poll());
+    }
+    this.nextHeartBeat = true;
+    return latestContainerInfoList;
+  }
+
+  @VisibleForTesting
+  public void setNextHeartBeat(boolean nextHeartBeat) {
+    this.nextHeartBeat = nextHeartBeat;
+  }
+  
+  @VisibleForTesting
+  public int getQueueSize() {
+    return nodeUpdateQueue.size();
+  }
  }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1450008&r1=1450007&r2=1450008&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Feb 26 03:33:01 2013
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -562,15 +563,20 @@ implements ResourceScheduler, CapacitySc
     return root.getQueueUserAclInfo(user);
   }
 
-  private synchronized void nodeUpdate(RMNode nm, 
-      List<ContainerStatus> newlyLaunchedContainers,
-      List<ContainerStatus> completedContainers) {
+  private synchronized void nodeUpdate(RMNode nm) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
     }
-                  
-    FiCaSchedulerNode node = getNode(nm.getNodeID());
 
+    FiCaSchedulerNode node = getNode(nm.getNodeID());
+    List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
+    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
+    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
+    for(UpdatedContainerInfo containerInfo : containerInfoList) {
+      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
+      completedContainers.addAll(containerInfo.getCompletedContainers());
+    }
+    
     // Processing the newly launched containers
     for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
       containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@@ -666,9 +672,7 @@ implements ResourceScheduler, CapacitySc
     case NODE_UPDATE:
     {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
-      nodeUpdate(nodeUpdatedEvent.getRMNode(), 
-          nodeUpdatedEvent.getNewlyLaunchedContainers(),
-          nodeUpdatedEvent.getCompletedContainers());
+      nodeUpdate(nodeUpdatedEvent.getRMNode());
     }
     break;
     case APP_ADDED:

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java?rev=1450008&r1=1450007&r2=1450008&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java Tue Feb 26 03:33:01 2013
@@ -18,35 +18,18 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
 
-import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 public class NodeUpdateSchedulerEvent extends SchedulerEvent {
 
   private final RMNode rmNode;
-  private final List<ContainerStatus> newlyLaunchedContainers;
-  private final List<ContainerStatus> completedContainersStatuses;
 
-  public NodeUpdateSchedulerEvent(RMNode rmNode,
-      List<ContainerStatus> newlyLaunchedContainers,
-      List<ContainerStatus> completedContainers) {
+  public NodeUpdateSchedulerEvent(RMNode rmNode) {
     super(SchedulerEventType.NODE_UPDATE);
     this.rmNode = rmNode;
-    this.newlyLaunchedContainers = newlyLaunchedContainers;
-    this.completedContainersStatuses = completedContainers;
   }
 
   public RMNode getRMNode() {
     return rmNode;
   }
-
-  public List<ContainerStatus> getNewlyLaunchedContainers() {
-    return newlyLaunchedContainers;
-  }
-
-  public List<ContainerStatus> getCompletedContainers() {
-    return completedContainersStatuses;
-  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1450008&r1=1450007&r2=1450008&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Tue Feb 26 03:33:01 2013
@@ -33,7 +33,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
@@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -746,15 +746,20 @@ public class FairScheduler implements Re
   /**
    * Process a heartbeat update from a node.
    */
-  private synchronized void nodeUpdate(RMNode nm,
-      List<ContainerStatus> newlyLaunchedContainers,
-      List<ContainerStatus> completedContainers) {
+  private synchronized void nodeUpdate(RMNode nm) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
     }
     eventLog.log("HEARTBEAT", nm.getHostName());
     FSSchedulerNode node = nodes.get(nm.getNodeID());
 
+    List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
+    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
+    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
+    for(UpdatedContainerInfo containerInfo : containerInfoList) {
+      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
+      completedContainers.addAll(containerInfo.getCompletedContainers());
+    } 
     // Processing the newly launched containers
     for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
       containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@@ -860,9 +865,7 @@ public class FairScheduler implements Re
         throw new RuntimeException("Unexpected event type: " + event);
       }
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
-      nodeUpdate(nodeUpdatedEvent.getRMNode(),
-          nodeUpdatedEvent.getNewlyLaunchedContainers(),
-          nodeUpdatedEvent.getCompletedContainers());
+      nodeUpdate(nodeUpdatedEvent.getRMNode());
       break;
     case APP_ADDED:
       if (!(event instanceof AppAddedSchedulerEvent)) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1450008&r1=1450007&r2=1450008&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Feb 26 03:33:01 2013
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -576,11 +577,16 @@ public class FifoScheduler implements Re
     return assignedContainers;
   }
 
-  private synchronized void nodeUpdate(RMNode rmNode, 
-      List<ContainerStatus> newlyLaunchedContainers,
-      List<ContainerStatus> completedContainers) {
+  private synchronized void nodeUpdate(RMNode rmNode) {
     FiCaSchedulerNode node = getNode(rmNode.getNodeID());
     
+    List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
+    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
+    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
+    for(UpdatedContainerInfo containerInfo : containerInfoList) {
+      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
+      completedContainers.addAll(containerInfo.getCompletedContainers());
+    }
     // Processing the newly launched containers
     for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
       containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@@ -628,9 +634,7 @@ public class FifoScheduler implements Re
     {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = 
       (NodeUpdateSchedulerEvent)event;
-      nodeUpdate(nodeUpdatedEvent.getRMNode(), 
-          nodeUpdatedEvent.getNewlyLaunchedContainers(),
-          nodeUpdatedEvent.getCompletedContainers());
+      nodeUpdate(nodeUpdatedEvent.getRMNode());
     }
     break;
     case APP_ADDED:

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1450008&r1=1450007&r2=1450008&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Tue Feb 26 03:33:01 2013
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -31,6 +33,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 
 import com.google.common.collect.Lists;
 
@@ -187,6 +190,11 @@ public class MockNodes {
     public HeartbeatResponse getLastHeartBeatResponse() {
       return null;
     }
+
+    @Override
+    public List<UpdatedContainerInfo> pullContainerUpdates() {
+      return new ArrayList<UpdatedContainerInfo>();
+    }
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1450008&r1=1450007&r2=1450008&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Tue Feb 26 03:33:01 2013
@@ -201,7 +201,7 @@ public class TestFifoScheduler {
     testMinimumAllocation(conf, allocMB / 2);
   }
 
-  @Test
+  @Test (timeout = 5000)
   public void testReconnectedNode() throws Exception {
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
     conf.setQueues("default", new String[] {"default"});
@@ -215,19 +215,19 @@ public class TestFifoScheduler {
     fs.handle(new NodeAddedSchedulerEvent(n1));
     fs.handle(new NodeAddedSchedulerEvent(n2));
     List<ContainerStatus> emptyList = new ArrayList<ContainerStatus>();
-    fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
+    fs.handle(new NodeUpdateSchedulerEvent(n1));
     Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB());
 
     // reconnect n1 with downgraded memory
     n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
     fs.handle(new NodeRemovedSchedulerEvent(n1));
     fs.handle(new NodeAddedSchedulerEvent(n1));
-    fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
+    fs.handle(new NodeUpdateSchedulerEvent(n1));
 
     Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
   }
   
-  @Test
+  @Test (timeout = 5000)
   public void testHeadroom() throws Exception {
     
     Configuration conf = new Configuration();
@@ -275,7 +275,7 @@ public class TestFifoScheduler {
     fs.allocate(appAttemptId2, ask2, emptyId);
     
     // Trigger container assignment
-    fs.handle(new NodeUpdateSchedulerEvent(n1, emptyStatus, emptyStatus));
+    fs.handle(new NodeUpdateSchedulerEvent(n1));
     
     // Get the allocation for the applications and verify headroom
     Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1450008&r1=1450007&r2=1450008&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Tue Feb 26 03:33:01 2013
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.doAnsw
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -63,7 +65,7 @@ public class TestRMNodeTransitions {
   private YarnScheduler scheduler;
 
   private SchedulerEventType eventType;
-  private List<ContainerStatus> completedContainers;
+  private List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
   
   private final class TestSchedulerEventDispatcher implements
   EventHandler<SchedulerEvent> {
@@ -89,10 +91,11 @@ public class TestRMNodeTransitions {
             final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
             eventType = event.getType();
             if (eventType == SchedulerEventType.NODE_UPDATE) {
-              completedContainers = 
-                  ((NodeUpdateSchedulerEvent)event).getCompletedContainers();
-            } else {
-              completedContainers = null;
+              List<UpdatedContainerInfo> lastestContainersInfoList = 
+                  ((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates();
+              for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) {
+            	  completedContainers.addAll(lastestContainersInfo.getCompletedContainers()); 
+              }
             }
             return null;
           }
@@ -125,16 +128,16 @@ public class TestRMNodeTransitions {
     return event;
   }
   
-  @Test
+  @Test (timeout = 5000)
   public void testExpiredContainer() {
     // Start the node
     node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
     verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
     
     // Expire a container
-		ContainerId completedContainerId = BuilderUtils.newContainerId(
-				BuilderUtils.newApplicationAttemptId(
-						BuilderUtils.newApplicationId(0, 0), 0), 0);
+    ContainerId completedContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(0, 0), 0), 0);
     node.handle(new RMNodeCleanContainerEvent(null, completedContainerId));
     Assert.assertEquals(1, node.getContainersToCleanUp().size());
     
@@ -146,9 +149,110 @@ public class TestRMNodeTransitions {
     doReturn(Collections.singletonList(containerStatus)).
         when(statusEvent).getContainers();
     node.handle(statusEvent);
-    Assert.assertEquals(0, completedContainers.size());
+    /* Expect the scheduler call handle function 2 times
+     * 1. RMNode status from new to Running, handle the add_node event
+     * 2. handle the node update event
+     */
+    verify(scheduler,times(2)).handle(any(NodeUpdateSchedulerEvent.class));     
   }
-
+  
+  @Test (timeout = 5000)
+  public void testContainerUpdate() throws InterruptedException{
+    //Start the node
+    node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    
+    NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
+    RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);
+    node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    
+    ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(0, 0), 0), 0);
+    ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(1, 1), 1), 1);
+    ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(1, 1), 1), 2);
+ 
+    RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent();
+    RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent();
+    RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent();
+    
+    ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class);
+    ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class);
+    ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class);
+
+    doReturn(completedContainerIdFromNode1).when(containerStatusFromNode1)
+        .getContainerId();
+    doReturn(Collections.singletonList(containerStatusFromNode1))
+        .when(statusEventFromNode1).getContainers();
+    node.handle(statusEventFromNode1);
+    Assert.assertEquals(1, completedContainers.size());
+    Assert.assertEquals(completedContainerIdFromNode1,
+        completedContainers.get(0).getContainerId());
+
+    completedContainers.clear();
+
+    doReturn(completedContainerIdFromNode2_1).when(containerStatusFromNode2_1)
+        .getContainerId();
+    doReturn(Collections.singletonList(containerStatusFromNode2_1))
+        .when(statusEventFromNode2_1).getContainers();
+
+    doReturn(completedContainerIdFromNode2_2).when(containerStatusFromNode2_2)
+        .getContainerId();
+    doReturn(Collections.singletonList(containerStatusFromNode2_2))
+        .when(statusEventFromNode2_2).getContainers();
+
+    node2.setNextHeartBeat(false);
+    node2.handle(statusEventFromNode2_1);
+    node2.setNextHeartBeat(true);
+    node2.handle(statusEventFromNode2_2);
+
+    Assert.assertEquals(2, completedContainers.size());
+    Assert.assertEquals(completedContainerIdFromNode2_1,completedContainers.get(0)
+        .getContainerId()); 
+    Assert.assertEquals(completedContainerIdFromNode2_2,completedContainers.get(1)
+        .getContainerId());   
+  }
+  
+  @Test (timeout = 5000)
+  public void testStatusChange(){
+    //Start the node
+    node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    //Add info to the queue first
+    node.setNextHeartBeat(false);
+
+    ContainerId completedContainerId1 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(0, 0), 0), 0);
+    ContainerId completedContainerId2 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(1, 1), 1), 1);
+        
+    RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent();
+    RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent();
+
+    ContainerStatus containerStatus1 = mock(ContainerStatus.class);
+    ContainerStatus containerStatus2 = mock(ContainerStatus.class);
+
+    doReturn(completedContainerId1).when(containerStatus1).getContainerId();
+    doReturn(Collections.singletonList(containerStatus1))
+        .when(statusEvent1).getContainers();
+     
+    doReturn(completedContainerId2).when(containerStatus2).getContainerId();
+    doReturn(Collections.singletonList(containerStatus2))
+        .when(statusEvent2).getContainers();
+
+    verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class)); 
+    node.handle(statusEvent1);
+    node.handle(statusEvent2);
+    verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class));
+    Assert.assertEquals(2, node.getQueueSize());
+    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
+    Assert.assertEquals(0, node.getQueueSize());
+  }
+  
   @Test
   public void testRunningExpire() {
     RMNodeImpl node = getRunningNode();

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1450008&r1=1450007&r2=1450008&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Tue Feb 26 03:33:01 2013
@@ -30,7 +30,6 @@ import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -267,7 +266,7 @@ public class TestFairScheduler {
     Assert.assertEquals(3, queueManager.getLeafQueues().size());
   }
 
-  @Test
+  @Test (timeout = 5000)
   public void testSimpleContainerAllocation() {
     // Add a node
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@@ -283,8 +282,7 @@ public class TestFairScheduler {
 
     scheduler.update();
 
-    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
-      new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
     scheduler.handle(updateEvent);
 
     // Asked for less than min_allocation.
@@ -292,15 +290,14 @@ public class TestFairScheduler {
         scheduler.getQueueManager().getQueue("queue1").
         getResourceUsage().getMemory());
 
-    NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
-        new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
     scheduler.handle(updateEvent2);
 
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
       getResourceUsage().getMemory());
   }
 
-  @Test
+  @Test (timeout = 5000)
   public void testSimpleContainerReservation() throws InterruptedException {
     // Add a node
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@@ -310,8 +307,7 @@ public class TestFairScheduler {
     // Queue 1 requests full capacity of node
     createSchedulingRequest(1024, "queue1", "user1", 1);
     scheduler.update();
-    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
-      new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
     scheduler.handle(updateEvent);
 
     // Make sure queue 1 is allocated app capacity
@@ -331,8 +327,7 @@ public class TestFairScheduler {
     // Now another node checks in with capacity
     RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
     NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
-    NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
-        new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
     scheduler.handle(nodeEvent2);
     scheduler.handle(updateEvent2);
 
@@ -729,7 +724,7 @@ public class TestFairScheduler {
     assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
   }
 
-  @Test
+  @Test (timeout = 5000)
   public void testIsStarvedForMinShare() throws Exception {
     Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
@@ -758,8 +753,7 @@ public class TestFairScheduler {
     // Queue A wants 3 * 1024. Node update gives this all to A
     createSchedulingRequest(3 * 1024, "queueA", "user1");
     scheduler.update();
-    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1,
-        new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
     scheduler.handle(nodeEvent2);
 
     // Queue B arrives and wants 1 * 1024
@@ -788,7 +782,7 @@ public class TestFairScheduler {
     }
   }
 
-  @Test
+  @Test (timeout = 5000)
   public void testIsStarvedForFairShare() throws Exception {
     Configuration conf = createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
@@ -817,8 +811,7 @@ public class TestFairScheduler {
     // Queue A wants 3 * 1024. Node update gives this all to A
     createSchedulingRequest(3 * 1024, "queueA", "user1");
     scheduler.update();
-    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1,
-        new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
     scheduler.handle(nodeEvent2);
 
     // Queue B arrives and wants 1 * 1024
@@ -848,7 +841,7 @@ public class TestFairScheduler {
     }
   }
 
-  @Test
+  @Test (timeout = 5000)
   /**
    * Make sure containers are chosen to be preempted in the correct order. Right
    * now this means decreasing order of priority.
@@ -912,16 +905,13 @@ public class TestFairScheduler {
 
     // Sufficient node check-ins to fully schedule containers
     for (int i = 0; i < 2; i++) {
-      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1,
-          new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
       scheduler.handle(nodeUpdate1);
 
-      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2,
-          new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
       scheduler.handle(nodeUpdate2);
 
-      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3,
-          new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
       scheduler.handle(nodeUpdate3);
     }
 
@@ -982,7 +972,7 @@ public class TestFairScheduler {
     assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
   }
 
-  @Test
+  @Test (timeout = 5000)
   /**
    * Tests the timing of decision to preempt tasks.
    */
@@ -1053,16 +1043,13 @@ public class TestFairScheduler {
 
     // Sufficient node check-ins to fully schedule containers
     for (int i = 0; i < 2; i++) {
-      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1,
-          new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
       scheduler.handle(nodeUpdate1);
 
-      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2,
-          new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
       scheduler.handle(nodeUpdate2);
 
-      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3,
-          new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
       scheduler.handle(nodeUpdate3);
     }
 
@@ -1110,7 +1097,7 @@ public class TestFairScheduler {
         Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime())));
   }
   
-  @Test
+  @Test (timeout = 5000)
   public void testMultipleContainersWaitingForReservation() {
     // Add a node
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@@ -1120,8 +1107,7 @@ public class TestFairScheduler {
     // Request full capacity of node
     createSchedulingRequest(1024, "queue1", "user1", 1);
     scheduler.update();
-    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
-      new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
     scheduler.handle(updateEvent);
 
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue2", "user2", 1);
@@ -1137,7 +1123,7 @@ public class TestFairScheduler {
         scheduler.applications.get(attId2).getCurrentReservation().getMemory());
   }
 
-  @Test
+  @Test (timeout = 5000)
   public void testUserMaxRunningApps() throws Exception {
     // Set max running apps
     Configuration conf = createConfiguration();
@@ -1166,8 +1152,7 @@ public class TestFairScheduler {
         "user1", 1);
     
     scheduler.update();
-    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
-      new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
     scheduler.handle(updateEvent);
     
     // App 1 should be running
@@ -1192,7 +1177,7 @@ public class TestFairScheduler {
     assertEquals(2, scheduler.applications.get(attId1).getLiveContainers().size());
   }
   
-  @Test
+  @Test (timeout = 5000)
   public void testReservationWhileMultiplePriorities() {
     // Add a node
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@@ -1202,8 +1187,7 @@ public class TestFairScheduler {
     ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
         "user1", 1, 2);
     scheduler.update();
-    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
-      new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
     scheduler.handle(updateEvent);
     
     FSSchedulerApp app = scheduler.applications.get(attId);
@@ -1276,7 +1260,7 @@ public class TestFairScheduler {
     assertNull("The application was allowed", app2);
   }
   
-  @Test
+  @Test (timeout = 5000)
   public void testMultipleNodesSingleRackRequest() throws Exception {
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
     RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@@ -1303,22 +1287,20 @@ public class TestFairScheduler {
     
     // node 1 checks in
     scheduler.update();
-    NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1,
-      new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1);
     scheduler.handle(updateEvent1);
     // should assign node local
     assertEquals(1, scheduler.applications.get(appId).getLiveContainers().size());
 
     // node 2 checks in
     scheduler.update();
-    NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
-      new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
     scheduler.handle(updateEvent2);
     // should assign rack local
     assertEquals(2, scheduler.applications.get(appId).getLiveContainers().size());
   }
   
-  @Test
+  @Test (timeout = 5000)
   public void testFifoWithinQueue() throws Exception {
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(3072));
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
@@ -1342,8 +1324,7 @@ public class TestFairScheduler {
     // Because tests set assignmultiple to false, each heartbeat assigns a single
     // container.
     
-    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
-      new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
 
     scheduler.handle(updateEvent);
     assertEquals(1, app1.getLiveContainers().size());