You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/04/05 10:40:07 UTC
svn commit: r1088923 [2/2] - in /hadoop/mapreduce/branches/MR-279:
mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/
mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
src/contrib/fairsc...
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java?rev=1088923&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java Tue Apr 5 08:40:05 2011
@@ -0,0 +1,340 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+
+/**
+ * This class is used by ClusterInfo to keep track of all the applications/containers
+ * running on a node.
+ *
+ */
+@Private
+@Unstable
+public class NodeManagerImpl implements NodeManager {
+ private static final Log LOG = LogFactory.getLog(NodeManager.class);
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ private final NodeId nodeId;
+ private final String hostName;
+ private Resource totalCapability;
+ private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
+ private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
+ private final Node node;
+ private final NodeHealthStatus nodeHealthStatus = recordFactory
+ .newRecordInstance(NodeHealthStatus.class);
+
+ private static final Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
+ private static final List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
+ private static final ApplicationId[] EMPTY_APPLICATION_ARRAY = new ApplicationId[]{};
+ private static final List<ApplicationId> EMPTY_APPLICATION_LIST = Arrays.asList(EMPTY_APPLICATION_ARRAY);
+
+ public static final String ANY = "*";
+ /* set of containers that are allocated containers */
+ private final Map<ContainerId, Container> allocatedContainers =
+ new TreeMap<ContainerId, Container>();
+
+ /* set of containers that are currently active on a node manager */
+ private final Map<ContainerId, Container> activeContainers =
+ new TreeMap<ContainerId, Container>();
+
+ /* set of containers that need to be cleaned */
+ private final Set<Container> containersToClean =
+ new TreeSet<Container>(new org.apache.hadoop.yarn.server.resourcemanager.resource.Container.Comparator());
+
+
+ /* the list of applications that have finished and need to be purged */
+ private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
+
+ private volatile int numContainers;
+
+ public NodeManagerImpl(NodeId nodeId, String hostname,
+ Node node, Resource capability) {
+ this.nodeId = nodeId;
+ this.totalCapability = capability;
+ this.hostName = hostname;
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+ availableResource, capability);
+ this.node = node;
+ }
+
+ /**
+ * NodeInfo for this node.
+ * @return the {@link NodeInfo} for this node.
+ */
+ public NodeInfo getNodeInfo() {
+ return this;
+ }
+
+ /**
+ * The Scheduler has allocated containers on this node to the
+ * given application.
+ *
+ * @param applicationId application
+ * @param containers allocated containers
+ */
+ public synchronized void allocateContainer(ApplicationId applicationId,
+ List<Container> containers) {
+ if (containers == null) {
+ LOG.error("Adding null containers for application " + applicationId);
+ return;
+ }
+ for (Container container : containers) {
+ allocateContainer(container);
+ }
+
+ LOG.info("addContainers:" +
+ " node=" + getHostName() +
+ " #containers=" + containers.size() +
+ " available=" + getAvailableResource().getMemory() +
+ " used=" + getUsedResource().getMemory());
+ }
+
+ /**
+ * Status update from the NodeManager
+ * @param nodeStatus node status
+ * @return the set of containers no longer should be used by the
+ * node manager.
+ */
+ public synchronized NodeResponse
+ statusUpdate(Map<String,List<Container>> allContainers) {
+
+ if (allContainers == null) {
+ return new NodeResponse(EMPTY_APPLICATION_LIST, EMPTY_CONTAINER_LIST,
+ EMPTY_CONTAINER_LIST);
+ }
+
+ List<Container> listContainers = new ArrayList<Container>();
+ // Iterate through the running containers and update their status
+ for (Map.Entry<String, List<Container>> e :
+ allContainers.entrySet()) {
+ listContainers.addAll(e.getValue());
+ }
+ NodeResponse statusCheck = update(listContainers);
+ return statusCheck;
+ }
+
+ /**
+ * Status update for an application running on a given node
+ * @param node node
+ * @param containers containers update.
+ * @return containers that are completed or need to be preempted.
+ */
+ private synchronized NodeResponse update(List<Container> containers) {
+ List<Container> completedContainers = new ArrayList<Container>();
+ List<Container> containersToCleanUp = new ArrayList<Container>();
+ List<ApplicationId> lastfinishedApplications = new ArrayList<ApplicationId>();
+
+ for (Container container : containers) {
+ if (allocatedContainers.remove(container.getId()) != null) {
+ activeContainers.put(container.getId(), container);
+ LOG.info("Activated container " + container.getId() + " on node " +
+ getHostName());
+ }
+
+ if (container.getState() == ContainerState.COMPLETE) {
+ if (activeContainers.remove(container.getId()) != null) {
+ updateResource(container);
+ LOG.info("Completed container " + container);
+ }
+ completedContainers.add(container);
+ LOG.info("Removed completed container " + container.getId() + " on node " +
+ getHostName());
+ }
+ else if (container.getState() != ContainerState.COMPLETE &&
+ (!allocatedContainers.containsKey(container.getId())) &&
+ !activeContainers.containsKey(container.getId())) {
+ containersToCleanUp.add(container);
+ }
+ }
+ containersToCleanUp.addAll(containersToClean);
+ /* clear out containers to clean */
+ containersToClean.clear();
+ lastfinishedApplications.addAll(finishedApplications);
+ return new NodeResponse(lastfinishedApplications, completedContainers,
+ containersToCleanUp);
+ }
+
+ private synchronized void allocateContainer(Container container) {
+ deductAvailableResource(container.getResource());
+ ++numContainers;
+
+ allocatedContainers.put(container.getId(), container);
+ LOG.info("Allocated container " + container.getId() +
+ " to node " + getHostName());
+
+ LOG.info("Assigned container " + container.getId() +
+ " of capacity " + container.getResource() + " on host " + getHostName() +
+ ", which currently has " + numContainers + " containers, " +
+ getUsedResource() + " used and " +
+ getAvailableResource() + " available");
+ }
+
+ private synchronized boolean isValidContainer(Container c) {
+ if (activeContainers.containsKey(c.getId()) || allocatedContainers.containsKey(c.getId()))
+ return true;
+ return false;
+ }
+
+ private synchronized void updateResource(Container container) {
+ addAvailableResource(container.getResource());
+ --numContainers;
+ }
+
+ /**
+ * Release an allocated container on this node.
+ * @param container container to be released
+ * @return <code>true</code> iff the container was unused,
+ * <code>false</code> otherwise
+ */
+ public synchronized boolean releaseContainer(Container container) {
+ if (!isValidContainer(container)) {
+ LOG.error("Invalid container released " + container);
+ return false;
+ }
+
+ /* remove the containers from the nodemanger */
+
+ // Was this container launched?
+ activeContainers.remove(container.getId());
+ allocatedContainers.remove(container.getId());
+ containersToClean.add(container);
+ updateResource(container);
+
+ LOG.info("Released container " + container.getId() +
+ " of capacity " + container.getResource() + " on host " + getHostName() +
+ ", which currently has " + numContainers + " containers, " +
+ getUsedResource() + " used and " + getAvailableResource()
+ + " available" + ", release resources=" + true);
+ return true;
+ }
+
+ @Override
+ public NodeId getNodeID() {
+ return this.nodeId;
+ }
+
+ @Override
+ public String getHostName() {
+ return this.hostName;
+ }
+
+ @Override
+ public Resource getTotalCapability() {
+ return this.totalCapability;
+ }
+
+ @Override
+ public String getRackName() {
+ return node.getNetworkLocation();
+ }
+
+ @Override
+ public Node getNode() {
+ return this.node;
+ }
+
+ @Override
+ public synchronized Resource getAvailableResource() {
+ return this.availableResource;
+ }
+
+ @Override
+ public synchronized Resource getUsedResource() {
+ return this.usedResource;
+ }
+
+ public synchronized void addAvailableResource(Resource resource) {
+ if (resource == null) {
+ LOG.error("Invalid resource addition of null resource for " + this.hostName);
+ return;
+ }
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+ availableResource, resource);
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+ usedResource, resource);
+ }
+
+ public synchronized void deductAvailableResource(Resource resource) {
+ if (resource == null) {
+ LOG.error("Invalid deduction of null resource for "+ this.hostName);
+ }
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+ availableResource, resource);
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+ usedResource, resource);
+ }
+
+ public synchronized void notifyFinishedApplication(ApplicationId applicationId) {
+ finishedApplications.add(applicationId);
+ /* make sure to iterate through the list and remove all the containers that
+ * belong to this application.
+ */
+ }
+
+ @Override
+ public int getNumContainers() {
+ return numContainers;
+ }
+
+ @Override
+ public NodeHealthStatus getNodeHealthStatus() {
+ synchronized (this.nodeHealthStatus) {
+ return this.nodeHealthStatus;
+ }
+ }
+
+ @Override
+ public void updateHealthStatus(NodeHealthStatus healthStatus) {
+ synchronized (this.nodeHealthStatus) {
+ this.nodeHealthStatus.setIsNodeHealthy(healthStatus.getIsNodeHealthy());
+ this.nodeHealthStatus.setHealthReport(healthStatus.getHealthReport());
+ this.nodeHealthStatus.setLastHealthReportTime(healthStatus
+ .getLastHealthReportTime());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "host: " + getHostName() + " #containers=" + getNumContainers() +
+ " available=" + getAvailableResource().getMemory() +
+ " used=" + getUsedResource().getMemory();
+ }
+ }
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java Tue Apr 5 08:40:05 2011
@@ -39,14 +39,9 @@ public interface ResourceListener {
/**
* add a node to the resource listener.
- * @param nodeId the nodeid of the node
- * @param hostName the hostname of this node.
- * @param node the topology information.
- * @param capability the resource capability of the node.
- * @return the {@link NodeInfo} object that tracks this nodemanager.
+ * @param nodeManager the nodeManager view
*/
- public NodeInfo addNode(NodeId nodeId,String hostName,
- Node node, Resource capability);
+ public void addNode(NodeManager nodeManager);
/**
* A node has been removed from the cluster.
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Apr 5 08:40:05 2011
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -394,13 +395,10 @@ implements ResourceScheduler, CapacitySc
}
@Override
- public synchronized NodeInfo addNode(NodeId nodeId,
- String hostName, Node node, Resource capability) {
- NodeManager nodeManager = new NodeManager(nodeId, hostName, node, capability);
+ public synchronized void addNode(NodeManager nodeManager) {
nodes.put(nodeManager.getHostName(), nodeManager);
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
clusterResource, nodeManager.getTotalCapability());
- return nodeManager;
}
public synchronized boolean releaseContainer(ApplicationId applicationId,
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Apr 5 08:40:05 2011
@@ -468,13 +468,10 @@ public class FifoScheduler implements Re
}
@Override
- public synchronized NodeInfo addNode(NodeId nodeId,
- String hostName, Node node, Resource capability) {
- NodeManager nodeManager = new NodeManager(nodeId, hostName, node, capability);
+ public synchronized void addNode(NodeManager nodeManager) {
nodes.put(nodeManager.getHostName(), nodeManager);
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
clusterResource, nodeManager.getTotalCapability());
- return nodeManager;
}
public synchronized boolean releaseContainer(ApplicationId applicationId,
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java Tue Apr 5 08:40:05 2011
@@ -48,6 +48,9 @@ class NodesPage extends RmView {
th(".rack", "Rack").
th(".nodeid", "Node ID").
th(".host", "Host").
+ th(".healthStatus", "Health-status").
+ th(".lastHealthUpdate", "Last health-update").
+ th(".healthReport", "Health-report").
th(".containers", "Containers").
th(".mem", "Mem Used (MB)").
th(".mem", "Mem Avail (MB)")._()._().
@@ -57,6 +60,11 @@ class NodesPage extends RmView {
td(ni.getRackName()).
td(String.valueOf(ni.getNodeID().getId())).
td(ni.getHostName()).
+ td(ni.getNodeHealthStatus().getIsNodeHealthy() ? "Healthy"
+ : "Unhealthy").
+ td(String.valueOf(ni.getNodeHealthStatus()
+ .getLastHealthReportTime())).
+ td(String.valueOf(ni.getNodeHealthStatus().getHealthReport())).
td(String.valueOf(ni.getNumContainers())).
td(String.valueOf(ni.getUsedResource().getMemory())).
td(String.valueOf(ni.getAvailableResource().getMemory()))._();
@@ -79,8 +87,8 @@ class NodesPage extends RmView {
private String nodesTableInit() {
return tableInit().
- // rack, nodeid, host, containers, memused, memavail
- append(", aoColumns:[null, null, null, {bSearchable:false}, ").
+ // rack, nodeid, host, healthStatus, containers, memused, memavail
+ append(", aoColumns:[null, null, null, null, {bSearchable:false}, ").
append("{bSearchable:false}, {bSearchable:false}]}").toString();
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Tue Apr 5 08:40:05 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
@@ -76,6 +77,8 @@ public class MockNodes {
final int nid = NODE_ID++;
final NodeId nodeID = newNodeID(nid);
final String hostName = "host"+ nid;
+ final NodeHealthStatus nodeHealthStatus =
+ recordFactory.newRecordInstance(NodeHealthStatus.class);
final Resource used = newUsedResource(perNode);
final Resource avail = newAvailResource(perNode, used);
final int containers = (int)(Math.random() * 8);
@@ -119,6 +122,11 @@ public class MockNodes {
public int getNumContainers() {
return containers;
}
+
+ @Override
+ public NodeHealthStatus getNodeHealthStatus() {
+ return nodeHealthStatus;
+ }
};
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Tue Apr 5 08:40:05 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -129,9 +130,7 @@ public class TestAMRestart extends TestC
private class DummyResourceScheduler implements ResourceScheduler {
@Override
- public NodeInfo addNode(NodeId nodeId, String hostName, Node node,
- Resource capability) {
- return null;
+ public void addNode(NodeManager nodeManager) {
}
@Override
public void removeNode(NodeInfo node) {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java Tue Apr 5 08:40:05 2011
@@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -259,7 +261,10 @@ public class TestApplicationCleanup exte
Node node = new NodeBase(hostName, NetworkTopology.DEFAULT_RACK);
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemory(1024);
- return scheduler.addNode(nodeId, hostName, node, capability);
+ NodeManager nodeManager =
+ new NodeManagerImpl(nodeId, hostName, node, capability);
+ scheduler.addNode(nodeManager);
+ return nodeManager;
}
@Test
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java Tue Apr 5 08:40:05 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -80,9 +81,7 @@ public class TestSchedulerNegotiator ext
ContainerTokenSecretManager secretManager) {
}
@Override
- public NodeInfo addNode(NodeId nodeId, String hostName, Node node,
- Resource capability) {
- return null;
+ public void addNode(NodeManager nodeManager) {
}
@Override
public NodeResponse nodeUpdate(NodeInfo nodeInfo,
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java Tue Apr 5 08:40:05 2011
@@ -61,9 +61,7 @@ public class TestNMExpiry extends TestCa
private static class VoidResourceListener implements ResourceListener {
@Override
- public NodeInfo addNode(NodeId nodeId, String hostName, Node node,
- Resource capability) {
- return new NodeManager(nodeId, hostName, node, capability);
+ public void addNode(NodeManager nodeManager) {
}
@Override
public void removeNode(NodeInfo node) {
@@ -111,17 +109,22 @@ public class TestNMExpiry extends TestCa
resourceTracker.start();
}
- private class TestThread extends Thread {
+ private class ThirdNodeHeartBeatThread extends Thread {
public void run() {
- HeartbeatResponse res = recordFactory.newRecordInstance(HeartbeatResponse.class);
+ int lastResponseID = 0;
while (!stopT) {
try {
- org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = recordFactory.newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
- nodeStatus.setNodeId(response.getNodeId());
- nodeStatus.setResponseId(res.getResponseId());
- NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
+ org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus =
+ recordFactory
+ .newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
+ nodeStatus.setNodeId(thirdNodeRegResponse.getNodeId());
+ nodeStatus.setResponseId(lastResponseID);
+ NodeHeartbeatRequest request =
+ recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
request.setNodeStatus(nodeStatus);
- res = resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
+ lastResponseID =
+ resourceTracker.nodeHeartbeat(request).getHeartbeatResponse()
+ .getResponseId();
} catch(Exception e) {
LOG.info("failed to heartbeat ", e);
}
@@ -130,7 +133,7 @@ public class TestNMExpiry extends TestCa
}
boolean stopT = false;
- RegistrationResponse response;
+ RegistrationResponse thirdNodeRegResponse;
@Test
public void testNMExpiry() throws Exception {
@@ -149,13 +152,16 @@ public class TestNMExpiry extends TestCa
request3.setResource(capability);
resourceTracker.registerNodeManager(request1);
resourceTracker.registerNodeManager(request2);
- response = resourceTracker.registerNodeManager(request3).getRegistrationResponse();
+ thirdNodeRegResponse =
+ resourceTracker.registerNodeManager(request3)
+ .getRegistrationResponse();
/* test to see if hostanme 3 does not expire */
stopT = false;
- new TestThread().start();
+ new ThirdNodeHeartBeatThread().start();
+ int timeOut = 0;
synchronized (notify) {
- while (notify.get() == 0) {
- notify.wait();
+ while (notify.get() == 0 && timeOut++ < 30) {
+ notify.wait(1000);
}
}
if (test.get() != 2)
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java?rev=1088923&r1=1088922&r2=1088923&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java Tue Apr 5 08:40:05 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -55,10 +56,8 @@ public class TestRMNMRPCResponseId exten
private class DummyResourceListener implements ResourceListener {
@Override
- public NodeInfo addNode(NodeId nodeId, String hostName, Node node,
- Resource capability) {
- nodeid = nodeId;
- return new NodeManager(nodeId, hostName, node, capability);
+ public void addNode(NodeManager nodeManager) {
+ nodeid = nodeManager.getNodeID();
}
@Override