You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/04/05 10:40:07 UTC

svn commit: r1088923 [2/2] - in /hadoop/mapreduce/branches/MR-279: mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/ mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ src/contrib/fairsc...

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java?rev=1088923&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java Tue Apr  5 08:40:05 2011
@@ -0,0 +1,340 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+
+/**
+ * This class is used by ClusterInfo to keep track of all the applications/containers
+ * running on a node.
+ *
+ */
+@Private
+@Unstable
+public class NodeManagerImpl implements NodeManager {
+  private static final Log LOG = LogFactory.getLog(NodeManager.class);
+  private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private final NodeId nodeId;
+  private final String hostName;
+  private Resource totalCapability;
+  private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
+  private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
+  private final Node node;
+  private final NodeHealthStatus nodeHealthStatus = recordFactory
+      .newRecordInstance(NodeHealthStatus.class);
+  
+  private static final Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
+  private static final List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
+  private static final ApplicationId[] EMPTY_APPLICATION_ARRAY = new ApplicationId[]{};
+  private static final List<ApplicationId> EMPTY_APPLICATION_LIST = Arrays.asList(EMPTY_APPLICATION_ARRAY);
+  
+  public static final String ANY = "*";  
+  /* set of containers that are allocated containers */
+  private final Map<ContainerId, Container> allocatedContainers = 
+    new TreeMap<ContainerId, Container>();
+    
+  /* set of containers that are currently active on a node manager */
+  private final Map<ContainerId, Container> activeContainers =
+    new TreeMap<ContainerId, Container>();
+  
+  /* set of containers that need to be cleaned */
+  private final Set<Container> containersToClean = 
+    new TreeSet<Container>(new org.apache.hadoop.yarn.server.resourcemanager.resource.Container.Comparator());
+
+  
+  /* the list of applications that have finished and need to be purged */
+  private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
+  
+  private volatile int numContainers;
+  
+  public NodeManagerImpl(NodeId nodeId, String hostname, 
+      Node node, Resource capability) {
+    this.nodeId = nodeId;   
+    this.totalCapability = capability; 
+    this.hostName = hostname;
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+        availableResource, capability);
+    this.node = node;
+  }
+
+  /**
+   * NodeInfo for this node.
+   * @return the {@link NodeInfo} for this node.
+   */
+  public NodeInfo getNodeInfo() {
+    return this;
+  }
+  
+  /**
+   * The Scheduler has allocated containers on this node to the 
+   * given application.
+   * 
+   * @param applicationId application
+   * @param containers allocated containers
+   */
+  public synchronized void allocateContainer(ApplicationId applicationId, 
+      List<Container> containers) {
+    if (containers == null) {
+      LOG.error("Adding null containers for application " + applicationId);
+      return;
+    }   
+    for (Container container : containers) {
+      allocateContainer(container);
+    }
+
+    LOG.info("addContainers:" +
+        " node=" + getHostName() + 
+        " #containers=" + containers.size() + 
+        " available=" + getAvailableResource().getMemory() + 
+        " used=" + getUsedResource().getMemory());
+  }
+
+  /**
+   * Status update from the NodeManager
+   * @param nodeStatus node status
+   * @return the set of containers no longer should be used by the
+   * node manager.
+   */
+  public synchronized NodeResponse 
+    statusUpdate(Map<String,List<Container>> allContainers) {
+
+    if (allContainers == null) {
+      return new NodeResponse(EMPTY_APPLICATION_LIST, EMPTY_CONTAINER_LIST,
+          EMPTY_CONTAINER_LIST);
+    }
+       
+    List<Container> listContainers = new ArrayList<Container>();
+    // Iterate through the running containers and update their status
+    for (Map.Entry<String, List<Container>> e : 
+      allContainers.entrySet()) {
+      listContainers.addAll(e.getValue());
+    }
+    NodeResponse statusCheck = update(listContainers);
+    return statusCheck;
+  }
+  
+  /**
+   * Status update for an application running on a given node
+   * @param node node
+   * @param containers containers update.
+   * @return containers that are completed or need to be preempted.
+   */
+  private synchronized NodeResponse update(List<Container> containers) {
+    List<Container> completedContainers = new ArrayList<Container>();
+    List<Container> containersToCleanUp = new ArrayList<Container>();
+    List<ApplicationId> lastfinishedApplications = new ArrayList<ApplicationId>();
+    
+    for (Container container : containers) {
+      if (allocatedContainers.remove(container.getId()) != null) {
+        activeContainers.put(container.getId(), container);
+        LOG.info("Activated container " + container.getId() + " on node " + 
+         getHostName());
+      }
+
+      if (container.getState() == ContainerState.COMPLETE) {
+        if (activeContainers.remove(container.getId()) != null) {
+          updateResource(container);
+          LOG.info("Completed container " + container);
+        }
+        completedContainers.add(container);
+        LOG.info("Removed completed container " + container.getId() + " on node " + 
+            getHostName());
+      }
+      else if (container.getState() != ContainerState.COMPLETE && 
+          (!allocatedContainers.containsKey(container.getId())) && 
+          !activeContainers.containsKey(container.getId())) {
+        containersToCleanUp.add(container);
+      }
+    }
+    containersToCleanUp.addAll(containersToClean);
+    /* clear out containers to clean */
+    containersToClean.clear();
+    lastfinishedApplications.addAll(finishedApplications);
+    return new NodeResponse(lastfinishedApplications, completedContainers, 
+        containersToCleanUp);
+  }
+  
+  private synchronized void allocateContainer(Container container) {
+    deductAvailableResource(container.getResource());
+    ++numContainers;
+    
+    allocatedContainers.put(container.getId(), container);
+    LOG.info("Allocated container " + container.getId() + 
+        " to node " + getHostName());
+    
+    LOG.info("Assigned container " + container.getId() + 
+        " of capacity " + container.getResource() + " on host " + getHostName() + 
+        ", which currently has " + numContainers + " containers, " + 
+        getUsedResource() + " used and " + 
+        getAvailableResource() + " available");
+  }
+
+  private synchronized boolean isValidContainer(Container c) {    
+    if (activeContainers.containsKey(c.getId()) || allocatedContainers.containsKey(c.getId()))
+      return true;
+    return false;
+  }
+
+  private synchronized void updateResource(Container container) {
+    addAvailableResource(container.getResource());
+    --numContainers;
+  }
+  
+  /**
+   * Release an allocated container on this node.
+   * @param container container to be released
+   * @return <code>true</code> iff the container was unused, 
+   *         <code>false</code> otherwise
+   */
+  public synchronized boolean releaseContainer(Container container) {
+    if (!isValidContainer(container)) {
+      LOG.error("Invalid container released " + container);
+      return false;
+    }
+    
+    /* remove the containers from the nodemanger */
+    
+    // Was this container launched?
+    activeContainers.remove(container.getId());
+    allocatedContainers.remove(container.getId());
+    containersToClean.add(container);
+    updateResource(container);
+
+    LOG.info("Released container " + container.getId() + 
+        " of capacity " + container.getResource() + " on host " + getHostName() + 
+        ", which currently has " + numContainers + " containers, " + 
+        getUsedResource() + " used and " + getAvailableResource()
+        + " available" + ", release resources=" + true);
+    return true;
+  }
+
+  @Override
+  public NodeId getNodeID() {
+    return this.nodeId;
+  }
+
+  @Override
+  public String getHostName() {
+    return this.hostName;
+  }
+
+  @Override
+  public Resource getTotalCapability() {
+   return this.totalCapability;
+  }
+
+  @Override
+  public String getRackName() {
+    return node.getNetworkLocation();
+  }
+
+  @Override
+  public Node getNode() {
+    return this.node;
+  }
+
+  @Override
+  public synchronized Resource getAvailableResource() {
+    return this.availableResource;
+  }
+
+  @Override
+  public synchronized Resource getUsedResource() {
+    return this.usedResource;
+  }
+
+  public synchronized void addAvailableResource(Resource resource) {
+    if (resource == null) {
+      LOG.error("Invalid resource addition of null resource for " + this.hostName);
+      return;
+    }
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+        availableResource, resource);
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+        usedResource, resource);
+  }
+
+  public synchronized void deductAvailableResource(Resource resource) {
+    if (resource == null) {
+      LOG.error("Invalid deduction of null resource for "+ this.hostName);
+    }
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+        availableResource, resource);
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+        usedResource, resource);
+  }
+
+  public synchronized void notifyFinishedApplication(ApplicationId applicationId) {  
+    finishedApplications.add(applicationId);
+    /* make sure to iterate through the list and remove all the containers that 
+     * belong to this application.
+     */
+  }
+
+  @Override
+  public int getNumContainers() {
+    return numContainers;
+  }
+
+  @Override
+  public NodeHealthStatus getNodeHealthStatus() {
+    synchronized (this.nodeHealthStatus) {
+      return this.nodeHealthStatus;
+    }
+  }
+
+  @Override
+  public void updateHealthStatus(NodeHealthStatus healthStatus) {
+    synchronized (this.nodeHealthStatus) {
+      this.nodeHealthStatus.setIsNodeHealthy(healthStatus.getIsNodeHealthy());
+      this.nodeHealthStatus.setHealthReport(healthStatus.getHealthReport());
+      this.nodeHealthStatus.setLastHealthReportTime(healthStatus
+          .getLastHealthReportTime());
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "host: " + getHostName() + " #containers=" + getNumContainers() +  
+      " available=" + getAvailableResource().getMemory() + 
+      " used=" + getUsedResource().getMemory();
+  }
+ }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java Tue Apr  5 08:40:05 2011
@@ -39,14 +39,9 @@ public interface ResourceListener {
  
   /**
    * add a node to the resource listener.
-   * @param nodeId the nodeid of the node
-   * @param hostName the hostname of this node.
-   * @param node the topology information.
-   * @param capability the resource  capability of the node.
-   * @return the {@link NodeInfo} object that tracks this nodemanager.
+   * @param nodeManager the nodeManager view
    */
-  public NodeInfo addNode(NodeId nodeId,String hostName,
-      Node node, Resource capability);
+  public void addNode(NodeManager nodeManager);
   
   /**
    * A node has been removed from the cluster.

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Apr  5 08:40:05 2011
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -394,13 +395,10 @@ implements ResourceScheduler, CapacitySc
   }
  
   @Override
-  public synchronized NodeInfo addNode(NodeId nodeId, 
-      String hostName, Node node, Resource capability) {
-    NodeManager nodeManager = new NodeManager(nodeId, hostName, node, capability);
+  public synchronized void addNode(NodeManager nodeManager) {
     nodes.put(nodeManager.getHostName(), nodeManager);
     org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
         clusterResource, nodeManager.getTotalCapability());
-    return nodeManager;
   }
 
   public synchronized boolean releaseContainer(ApplicationId applicationId, 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Apr  5 08:40:05 2011
@@ -468,13 +468,10 @@ public class FifoScheduler implements Re
   }
  
   @Override
-  public synchronized NodeInfo addNode(NodeId nodeId, 
-      String hostName, Node node, Resource capability) {
-    NodeManager nodeManager = new NodeManager(nodeId, hostName, node, capability);
+  public synchronized void addNode(NodeManager nodeManager) {
     nodes.put(nodeManager.getHostName(), nodeManager);
     org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
         clusterResource, nodeManager.getTotalCapability());
-    return nodeManager;
   }
 
   public synchronized boolean releaseContainer(ApplicationId applicationId, 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java Tue Apr  5 08:40:05 2011
@@ -48,6 +48,9 @@ class NodesPage extends RmView {
           th(".rack", "Rack").
           th(".nodeid", "Node ID").
           th(".host", "Host").
+          th(".healthStatus", "Health-status").
+          th(".lastHealthUpdate", "Last health-update").
+          th(".healthReport", "Health-report").
           th(".containers", "Containers").
           th(".mem", "Mem Used (MB)").
           th(".mem", "Mem Avail (MB)")._()._().
@@ -57,6 +60,11 @@ class NodesPage extends RmView {
             td(ni.getRackName()).
             td(String.valueOf(ni.getNodeID().getId())).
             td(ni.getHostName()).
+            td(ni.getNodeHealthStatus().getIsNodeHealthy() ? "Healthy"
+                : "Unhealthy").
+            td(String.valueOf(ni.getNodeHealthStatus()
+                .getLastHealthReportTime())).
+            td(String.valueOf(ni.getNodeHealthStatus().getHealthReport())).
             td(String.valueOf(ni.getNumContainers())).
             td(String.valueOf(ni.getUsedResource().getMemory())).
             td(String.valueOf(ni.getAvailableResource().getMemory()))._();
@@ -79,8 +87,8 @@ class NodesPage extends RmView {
 
   private String nodesTableInit() {
     return tableInit().
-        // rack, nodeid, host, containers, memused, memavail
-        append(", aoColumns:[null, null, null, {bSearchable:false}, ").
+        // rack, nodeid, host, healthStatus, containers, memused, memavail
+        append(", aoColumns:[null, null, null, null, {bSearchable:false}, ").
         append("{bSearchable:false}, {bSearchable:false}]}").toString();
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Tue Apr  5 08:40:05 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 
@@ -76,6 +77,8 @@ public class MockNodes {
     final int nid = NODE_ID++;
     final NodeId nodeID = newNodeID(nid);
     final String hostName = "host"+ nid;
+    final NodeHealthStatus nodeHealthStatus =
+        recordFactory.newRecordInstance(NodeHealthStatus.class);
     final Resource used = newUsedResource(perNode);
     final Resource avail = newAvailResource(perNode, used);
     final int containers = (int)(Math.random() * 8);
@@ -119,6 +122,11 @@ public class MockNodes {
       public int getNumContainers() {
         return containers;
       }
+
+      @Override
+      public NodeHealthStatus getNodeHealthStatus() {
+        return nodeHealthStatus;
+      }
     };
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Tue Apr  5 08:40:05 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -129,9 +130,7 @@ public class TestAMRestart extends TestC
 
   private class DummyResourceScheduler implements ResourceScheduler {
     @Override
-    public NodeInfo addNode(NodeId nodeId, String hostName, Node node,
-        Resource capability) {
-      return null;
+    public void addNode(NodeManager nodeManager) {
     }
     @Override
     public void removeNode(NodeInfo node) {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java Tue Apr  5 08:40:05 2011
@@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -259,7 +261,10 @@ public class TestApplicationCleanup exte
     Node node = new NodeBase(hostName, NetworkTopology.DEFAULT_RACK);
     Resource capability = recordFactory.newRecordInstance(Resource.class);
     capability.setMemory(1024);
-    return scheduler.addNode(nodeId, hostName, node, capability);
+    NodeManager nodeManager =
+      new NodeManagerImpl(nodeId, hostName, node, capability);
+    scheduler.addNode(nodeManager);
+    return nodeManager;
   }
 
   @Test

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java Tue Apr  5 08:40:05 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -80,9 +81,7 @@ public class TestSchedulerNegotiator ext
         ContainerTokenSecretManager secretManager) {
     }
     @Override
-    public NodeInfo addNode(NodeId nodeId, String hostName, Node node,
-        Resource capability) {
-      return null;
+    public void addNode(NodeManager nodeManager) {
     }
     @Override
     public NodeResponse nodeUpdate(NodeInfo nodeInfo,

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java Tue Apr  5 08:40:05 2011
@@ -61,9 +61,7 @@ public class TestNMExpiry extends TestCa
 
   private static class VoidResourceListener implements ResourceListener {
     @Override
-    public NodeInfo addNode(NodeId nodeId, String hostName, Node node,
-        Resource capability) {
-      return new NodeManager(nodeId, hostName, node, capability);
+    public void addNode(NodeManager nodeManager) {      
     }
     @Override
     public void removeNode(NodeInfo node) {
@@ -111,17 +109,22 @@ public class TestNMExpiry extends TestCa
     resourceTracker.start();
   }
 
-  private class TestThread extends Thread {
+  private class ThirdNodeHeartBeatThread extends Thread {
     public void run() {
-      HeartbeatResponse res = recordFactory.newRecordInstance(HeartbeatResponse.class);
+      int lastResponseID = 0;
       while (!stopT) {
         try {
-          org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = recordFactory.newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
-          nodeStatus.setNodeId(response.getNodeId());
-          nodeStatus.setResponseId(res.getResponseId());
-          NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
+          org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus =
+              recordFactory
+                  .newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
+          nodeStatus.setNodeId(thirdNodeRegResponse.getNodeId());
+          nodeStatus.setResponseId(lastResponseID);
+          NodeHeartbeatRequest request =
+              recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
           request.setNodeStatus(nodeStatus);
-          res = resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
+          lastResponseID =
+              resourceTracker.nodeHeartbeat(request).getHeartbeatResponse()
+                  .getResponseId();
         } catch(Exception e) {
           LOG.info("failed to heartbeat ", e);
         }
@@ -130,7 +133,7 @@ public class TestNMExpiry extends TestCa
   }
 
   boolean stopT = false;
-  RegistrationResponse response;
+  RegistrationResponse thirdNodeRegResponse;
 
   @Test
   public void testNMExpiry() throws Exception {
@@ -149,13 +152,16 @@ public class TestNMExpiry extends TestCa
     request3.setResource(capability);
     resourceTracker.registerNodeManager(request1);
     resourceTracker.registerNodeManager(request2);
-    response = resourceTracker.registerNodeManager(request3).getRegistrationResponse();
+    thirdNodeRegResponse =
+        resourceTracker.registerNodeManager(request3)
+            .getRegistrationResponse();
     /* test to see if hostanme 3 does not expire */
     stopT = false;
-    new TestThread().start();
+    new ThirdNodeHeartBeatThread().start();
+    int timeOut = 0;
     synchronized (notify) {
-      while (notify.get() == 0) {
-        notify.wait();
+      while (notify.get() == 0 && timeOut++ < 30) {
+        notify.wait(1000);
       }
     }
     if (test.get() != 2) 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java Tue Apr  5 08:40:05 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -55,10 +56,8 @@ public class TestRMNMRPCResponseId exten
   private class DummyResourceListener implements ResourceListener {
 
     @Override
-    public NodeInfo addNode(NodeId nodeId, String hostName, Node node,
-        Resource capability) {
-      nodeid = nodeId;
-      return new NodeManager(nodeId, hostName, node, capability);
+    public void addNode(NodeManager nodeManager) {
+      nodeid = nodeManager.getNodeID();
     }
 
     @Override