You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/28 19:40:46 UTC

svn commit: r1190468 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-serv...

Author: vinodkv
Date: Fri Oct 28 17:40:45 2011
New Revision: 1190468

URL: http://svn.apache.org/viewvc?rev=1190468&view=rev
Log:
MAPREDUCE-2775. Fixed ResourceManager and NodeManager to force a decommissioned node to shutdown. Contributed by Devaraj K.
svn merge -c r1190467 --ignore-ancestry ../../trunk/

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java
      - copied unchanged from r1190467, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java
      - copied unchanged from r1190467, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
      - copied unchanged from r1190467, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Oct 28 17:40:45 2011
@@ -1786,6 +1786,9 @@ Release 0.23.0 - Unreleased
 
     MAPREDUCE-3296. Fixed the remaining nine FindBugs warnings. (vinodkv)
 
+    MAPREDUCE-2775. Fixed ResourceManager and NodeManager to force a
+    decommissioned node to shutdown. (Devaraj K via vinodkv)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java Fri Oct 28 17:40:45 2011
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.record
 
 public interface HeartbeatResponse {
   int getResponseId();
-  boolean getReboot();
+  NodeAction getNodeAction();
   
   List<ContainerId> getContainersToCleanupList();
   ContainerId getContainerToCleanup(int index);
@@ -35,7 +35,7 @@ public interface HeartbeatResponse {
   int getApplicationsToCleanupCount();
   
   void setResponseId(int responseId);
-  void setReboot(boolean reboot);
+  void setNodeAction(NodeAction action);
   
   void addAllContainersToCleanup(List<ContainerId> containers);
   void addContainerToCleanup(ContainerId container);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java Fri Oct 28 17:40:45 2011
@@ -23,4 +23,8 @@ public interface RegistrationResponse {
   public abstract ByteBuffer getSecretKey();
   
   public abstract void setSecretKey(ByteBuffer secretKey);
+  
+  public abstract NodeAction getNodeAction();
+  
+  public abstract void setNodeAction(NodeAction nodeAction);
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java Fri Oct 28 17:40:45 2011
@@ -32,11 +32,12 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
 
-
-    
-public class HeartbeatResponsePBImpl extends ProtoBase<HeartbeatResponseProto> implements HeartbeatResponse {
+public class HeartbeatResponsePBImpl extends
+    ProtoBase<HeartbeatResponseProto> implements HeartbeatResponse {
   HeartbeatResponseProto proto = HeartbeatResponseProto.getDefaultInstance();
   HeartbeatResponseProto.Builder builder = null;
   boolean viaProto = false;
@@ -100,16 +101,24 @@ public class HeartbeatResponsePBImpl ext
     builder.setResponseId((responseId));
   }
   @Override
-  public boolean getReboot() {
+  public NodeAction getNodeAction() {
     HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
-    return (p.getReboot());
+    if(!p.hasNodeAction()) {
+      return null;
+    }
+    return (convertFromProtoFormat(p.getNodeAction()));
   }
 
   @Override
-  public void setReboot(boolean reboot) {
+  public void setNodeAction(NodeAction nodeAction) {
     maybeInitBuilder();
-    builder.setReboot((reboot));
+    if (nodeAction == null) {
+      builder.clearNodeAction();
+      return;
+    }
+    builder.setNodeAction(convertToProtoFormat(nodeAction));
   }
+
   @Override
   public List<ContainerId> getContainersToCleanupList() {
     initContainersToCleanup();
@@ -296,7 +305,12 @@ public class HeartbeatResponsePBImpl ext
   private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
     return ((ApplicationIdPBImpl)t).getProto();
   }
-
-
-
+  
+  private NodeAction convertFromProtoFormat(NodeActionProto p) {
+    return  NodeAction.valueOf(p.name());
+  }
+  
+  private NodeActionProto convertToProtoFormat(NodeAction t) {
+    return NodeActionProto.valueOf(t.name());
+  }
 }  

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java Fri Oct 28 17:40:45 2011
@@ -21,17 +21,15 @@ package org.apache.hadoop.yarn.server.ap
 
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 
-
-    
-public class RegistrationResponsePBImpl extends ProtoBase<RegistrationResponseProto> implements RegistrationResponse {
+public class RegistrationResponsePBImpl extends
+    ProtoBase<RegistrationResponseProto> implements RegistrationResponse {
   RegistrationResponseProto proto = RegistrationResponseProto.getDefaultInstance();
   RegistrationResponseProto.Builder builder = null;
   boolean viaProto = false;
@@ -98,4 +96,31 @@ public class RegistrationResponsePBImpl 
     this.secretKey = secretKey;
   }
 
+  @Override
+  public NodeAction getNodeAction() {
+    RegistrationResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if(!p.hasNodeAction()) {
+      return null;
+    }
+    return convertFromProtoFormat(p.getNodeAction());
+  }
+
+  @Override
+  public void setNodeAction(NodeAction nodeAction) {
+    maybeInitBuilder();
+    if (nodeAction == null) {
+      builder.clearNodeAction();
+      return;
+    }
+    builder.setNodeAction(convertToProtoFormat(nodeAction));
+  }
+  
+  private NodeAction convertFromProtoFormat(NodeActionProto p) {
+    return  NodeAction.valueOf(p.name());
+  }
+  
+  private NodeActionProto convertToProtoFormat(NodeAction t) {
+    return NodeActionProto.valueOf(t.name());
+  }
+
 }  

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto Fri Oct 28 17:40:45 2011
@@ -23,6 +23,12 @@ option java_generate_equals_and_hash = t
 
 import "yarn_protos.proto";
 
+enum NodeActionProto {
+  NORMAL = 0;
+  REBOOT = 1;
+  SHUTDOWN = 2;
+}
+
 message NodeStatusProto {
   optional NodeIdProto node_id = 1;
   optional int32 response_id = 2;
@@ -32,11 +38,12 @@ message NodeStatusProto {
 
 message RegistrationResponseProto {
   optional bytes secret_key = 1;
+  optional NodeActionProto nodeAction = 2;
 }
 
 message HeartbeatResponseProto {
   optional int32 response_id = 1;
-  optional bool reboot = 2;
+  optional NodeActionProto nodeAction = 2;
   repeated ContainerIdProto containers_to_cleanup = 3;
   repeated ApplicationIdProto applications_to_cleanup = 4;
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Fri Oct 28 17:40:45 2011
@@ -50,9 +50,11 @@ import org.apache.hadoop.yarn.server.sec
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
 import org.apache.hadoop.yarn.util.Records;
 
-public class NodeManager extends CompositeService {
+public class NodeManager extends CompositeService implements
+    ServiceStateChangeListener {
   private static final Log LOG = LogFactory.getLog(NodeManager.class);
   protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
   protected ContainerTokenSecretManager containerTokenSecretManager;
@@ -123,6 +125,8 @@ public class NodeManager extends Composi
     NodeStatusUpdater nodeStatusUpdater =
         createNodeStatusUpdater(context, dispatcher, healthChecker, 
         this.containerTokenSecretManager);
+    
+    nodeStatusUpdater.register(this);
 
     NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
     addService(nodeResourceMonitor);
@@ -206,6 +210,16 @@ public class NodeManager extends Composi
     }
   }
 
+  
+  @Override
+  public void stateChanged(Service service) {
+    // Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
+    if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
+        && STATE.STOPPED.equals(service.getServiceState())) {
+      stop();
+    }
+  }
+  
   public static void main(String[] args) {
     StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
     try {
@@ -220,5 +234,4 @@ public class NodeManager extends Composi
       System.exit(-1);
     }
   }
-
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Fri Oct 28 17:40:45 2011
@@ -30,8 +30,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.NodeHealthCheckerService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -45,11 +45,11 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -160,6 +160,12 @@ public class NodeStatusUpdaterImpl exten
     request.setNodeId(this.nodeId);
     RegistrationResponse regResponse =
         this.resourceTracker.registerNodeManager(request).getRegistrationResponse();
+    // if the Resourcemanager instructs NM to shutdown.
+    if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) {
+      throw new YarnException(
+          "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
+    }
+    
     if (UserGroupInformation.isSecurityEnabled()) {
       this.secretKeyBytes = regResponse.getSecretKey().array();
     }
@@ -248,10 +254,25 @@ public class NodeStatusUpdaterImpl exten
             NodeStatus nodeStatus = getNodeStatus();
             nodeStatus.setResponseId(lastHeartBeatID);
             
-            NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
+            NodeHeartbeatRequest request = recordFactory
+                .newRecordInstance(NodeHeartbeatRequest.class);
             request.setNodeStatus(nodeStatus);            
             HeartbeatResponse response =
               resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
+            if (response.getNodeAction() == NodeAction.SHUTDOWN) {
+              LOG
+                  .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
+                  		" hence shutting down.");
+              NodeStatusUpdaterImpl.this.stop();
+              break;
+            }
+            if (response.getNodeAction() == NodeAction.REBOOT) {
+              LOG.info("Node is out of sync with ResourceManager,"
+                  + " hence shutting down.");
+              NodeStatusUpdaterImpl.this.stop();
+              break;
+            }
+
             lastHeartBeatID = response.getResponseId();
             List<ContainerId> containersToCleanup = response
                 .getContainersToCleanupList();
@@ -269,7 +290,6 @@ public class NodeStatusUpdaterImpl exten
             // TODO Better error handling. Thread can die with the rest of the
             // NM still running.
             LOG.error("Caught exception in status-updater", e);
-            break;
           }
         }
       }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Fri Oct 28 17:40:45 2011
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
@@ -85,10 +86,15 @@ public class TestNodeStatusUpdater {
   volatile Error nmStartError = null;
   private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
   private final Configuration conf = new YarnConfiguration();
+  private NodeManager nm;
 
   @After
   public void tearDown() {
     this.registeredNodes.clear();
+    heartBeatID = 0;
+    if (nm != null) {
+      nm.stop();
+    }
     DefaultMetricsSystem.shutdown();
   }
 
@@ -220,6 +226,7 @@ public class TestNodeStatusUpdater {
   }
 
   private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
+    public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
     private Context context;
 
     public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
@@ -232,10 +239,44 @@ public class TestNodeStatusUpdater {
 
     @Override
     protected ResourceTracker getRMClient() {
-      return new MyResourceTracker(this.context);
+      return resourceTracker;
     }
   }
+  
+  // 
+  private class MyResourceTracker2 implements ResourceTracker {
+    public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
+    public NodeAction registerNodeAction = NodeAction.NORMAL;
 
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnRemoteException {
+      
+      RegisterNodeManagerResponse response = recordFactory
+          .newRecordInstance(RegisterNodeManagerResponse.class);
+      RegistrationResponse regResponse = recordFactory
+      .newRecordInstance(RegistrationResponse.class);
+      regResponse.setNodeAction(registerNodeAction );
+      response.setRegistrationResponse(regResponse);
+      return response;
+    }
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnRemoteException {
+      NodeStatus nodeStatus = request.getNodeStatus();
+      nodeStatus.setResponseId(heartBeatID++);
+      HeartbeatResponse response = recordFactory
+          .newRecordInstance(HeartbeatResponse.class);
+      response.setResponseId(heartBeatID);
+      response.setNodeAction(heartBeatNodeAction);
+      
+      NodeHeartbeatResponse nhResponse = recordFactory
+      .newRecordInstance(NodeHeartbeatResponse.class);
+      nhResponse.setHeartbeatResponse(response);
+      return nhResponse;
+    }
+  }
+  
   @Before
   public void clearError() {
     nmStartError = null;
@@ -249,7 +290,7 @@ public class TestNodeStatusUpdater {
 
   @Test
   public void testNMRegistration() throws InterruptedException {
-    final NodeManager nm = new NodeManager() {
+    nm = new NodeManager() {
       @Override
       protected NodeStatusUpdater createNodeStatusUpdater(Context context,
           Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
@@ -295,14 +336,85 @@ public class TestNodeStatusUpdater {
       Assert.fail("NodeManager failed to start");
     }
 
-    while (heartBeatID <= 3) {
+    waitCount = 0;
+    while (heartBeatID <= 3 && waitCount++ != 20) {
       Thread.sleep(500);
     }
+    Assert.assertFalse(heartBeatID <= 3);
     Assert.assertEquals("Number of registered NMs is wrong!!", 1,
         this.registeredNodes.size());
 
     nm.stop();
   }
+  
+  @Test
+  public void testNodeDecommision() throws Exception {
+    nm = getNodeManager(NodeAction.SHUTDOWN);
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    Assert.assertEquals(STATE.INITED, nm.getServiceState());
+    nm.start();
+
+    int waitCount = 0;
+    while (heartBeatID < 1 && waitCount++ != 20) {
+      Thread.sleep(500);
+    }
+    Assert.assertFalse(heartBeatID < 1);
+
+    // NM takes a while to reach the STOPPED state.
+    waitCount = 0;
+    while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
+      LOG.info("Waiting for NM to stop..");
+      Thread.sleep(1000);
+    }
+
+    Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
+  }
+
+  @Test
+  public void testNodeReboot() throws Exception {
+    nm = getNodeManager(NodeAction.REBOOT);
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    Assert.assertEquals(STATE.INITED, nm.getServiceState());
+    nm.start();
+
+    int waitCount = 0;
+    while (heartBeatID < 1 && waitCount++ != 20) {
+      Thread.sleep(500);
+    }
+    Assert.assertFalse(heartBeatID < 1);
+
+    // NM takes a while to reach the STOPPED state.
+    waitCount = 0;
+    while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
+      LOG.info("Waiting for NM to stop..");
+      Thread.sleep(1000);
+    }
+
+    Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
+  }
+  
+  @Test
+  public void testNMShutdownForRegistrationFailure() {
+
+    nm = new NodeManager() {
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+          ContainerTokenSecretManager containerTokenSecretManager) {
+        MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater(
+            context, dispatcher, healthChecker, metrics,
+            containerTokenSecretManager);
+        MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
+        myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN;
+        nodeStatusUpdater.resourceTracker = myResourceTracker2;
+        return nodeStatusUpdater;
+      }
+    };
+    verifyNodeStartFailure("org.apache.hadoop.yarn.YarnException: "
+        + "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
+  }
 
   /**
    * Verifies that if for some reason NM fails to start ContainerManager RPC
@@ -314,7 +426,7 @@ public class TestNodeStatusUpdater {
   @Test
   public void testNoRegistrationWhenNMServicesFail() {
 
-    final NodeManager nm = new NodeManager() {
+    nm = new NodeManager() {
       @Override
       protected NodeStatusUpdater createNodeStatusUpdater(Context context,
           Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
@@ -341,16 +453,22 @@ public class TestNodeStatusUpdater {
       }
     };
 
+    verifyNodeStartFailure("Starting of RPC Server failed");
+  }
+
+  private void verifyNodeStartFailure(String errMessage) {
     YarnConfiguration conf = createNMConfig();
     nm.init(conf);
     try {
       nm.start();
       Assert.fail("NM should have failed to start. Didn't get exception!!");
     } catch (Exception e) {
-      Assert.assertEquals("Starting of RPC Server failed", e.getCause()
+      Assert.assertEquals(errMessage, e.getCause()
           .getMessage());
     }
-
+    
+    // the state change to stopped occurs only if the startup is success, else
+    // state change doesn't occur
     Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm
         .getServiceState());
 
@@ -371,4 +489,21 @@ public class TestNodeStatusUpdater {
         .toUri().getPath());
     return conf;
   }
+  
+  private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) {
+    return new NodeManager() {
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+          ContainerTokenSecretManager containerTokenSecretManager) {
+        MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
+            context, dispatcher, healthChecker, metrics,
+            containerTokenSecretManager);
+        MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
+        myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction;
+        myNodeStatusUpdater.resourceTracker = myResourceTracker2;
+        return myNodeStatusUpdater;
+      }
+    };
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Fri Oct 28 17:40:45 2011
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 
@@ -37,7 +36,6 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@@ -45,6 +43,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
@@ -76,11 +75,19 @@ public class ResourceTrackerService exte
 
   private static final NodeHeartbeatResponse reboot = recordFactory
       .newRecordInstance(NodeHeartbeatResponse.class);
+  private static final NodeHeartbeatResponse shutDown = recordFactory
+  .newRecordInstance(NodeHeartbeatResponse.class);
+  
   static {
     HeartbeatResponse rebootResp = recordFactory
         .newRecordInstance(HeartbeatResponse.class);
-    rebootResp.setReboot(true);
+    rebootResp.setNodeAction(NodeAction.REBOOT);
     reboot.setHeartbeatResponse(rebootResp);
+    
+    HeartbeatResponse decommissionedResp = recordFactory
+        .newRecordInstance(HeartbeatResponse.class);
+    decommissionedResp.setNodeAction(NodeAction.SHUTDOWN);
+    shutDown.setHeartbeatResponse(decommissionedResp);
   }
 
   public ResourceTrackerService(RMContext rmContext,
@@ -139,6 +146,7 @@ public class ResourceTrackerService exte
     super.stop();
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public RegisterNodeManagerResponse registerNodeManager(
       RegisterNodeManagerRequest request) throws YarnRemoteException {
@@ -149,121 +157,125 @@ public class ResourceTrackerService exte
     int httpPort = request.getHttpPort();
     Resource capability = request.getResource();
 
-    try {
-      // Check if this node is a 'valid' node
-      if (!this.nodesListManager.isValidNode(host)) {
-        LOG.info("Disallowed NodeManager from  " + host);
-        throw new IOException("Disallowed NodeManager from  " + host); 
-      }
-
-      RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort,
-          httpPort, resolve(host), capability);
-
-      if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
-        throw new IOException("Duplicate registration from the node!");
-      }
-
-      this.nmLivelinessMonitor.register(nodeId);
-
-      LOG.info("NodeManager from node " + host + 
-          "(cmPort: " + cmPort + " httpPort: " + httpPort + ") "
-          + "registered with capability: " + capability.getMemory()
-          + ", assigned nodeId " + nodeId);
-
-      RegistrationResponse regResponse = recordFactory.newRecordInstance(
-          RegistrationResponse.class);
-      SecretKey secretKey = this.containerTokenSecretManager
-          .createAndGetSecretKey(nodeId.toString());
-      regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded()));
+    RegisterNodeManagerResponse response = recordFactory
+        .newRecordInstance(RegisterNodeManagerResponse.class);
+    RegistrationResponse regResponse = recordFactory
+        .newRecordInstance(RegistrationResponse.class);
+    SecretKey secretKey = this.containerTokenSecretManager
+        .createAndGetSecretKey(nodeId.toString());
+    regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded()));
+
+    // Check if this node is a 'valid' node
+    if (!this.nodesListManager.isValidNode(host)) {
+      LOG.info("Disallowed NodeManager from  " + host
+          + ", Sending SHUTDOWN signal to the NodeManager.");
+      regResponse.setNodeAction(NodeAction.SHUTDOWN);
+      response.setRegistrationResponse(regResponse);
+      return response;
+    }
+
+    RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
+        resolve(host), capability);
 
-      RegisterNodeManagerResponse response = recordFactory
-          .newRecordInstance(RegisterNodeManagerResponse.class);
+    if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
+      LOG.info("Duplicate registration from the node at: " + host
+          + ", Sending SHUTDOWN Signal to the NodeManager");
+      regResponse.setNodeAction(NodeAction.SHUTDOWN);
       response.setRegistrationResponse(regResponse);
       return response;
-    } catch (IOException ioe) {
-      LOG.info("Exception in node registration from " + nodeId.getHost(), ioe);
-      throw RPCUtil.getRemoteException(ioe);
     }
+
+    this.rmContext.getDispatcher().getEventHandler().handle(
+        new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
+
+    this.nmLivelinessMonitor.register(nodeId);
+
+    LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort
+        + " httpPort: " + httpPort + ") " + "registered with capability: "
+        + capability.getMemory() + ", assigned nodeId " + nodeId);
+
+    regResponse.setNodeAction(NodeAction.NORMAL);
+    response.setRegistrationResponse(regResponse);
+    return response;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
       throws YarnRemoteException {
 
     NodeStatus remoteNodeStatus = request.getNodeStatus();
-    try {
-      /**
-       * Here is the node heartbeat sequence...
-       * 1. Check if it's a registered node
-       * 2. Check if it's a valid (i.e. not excluded) node
-       * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
-       * 4. Send healthStatus to RMNode
-       */
+    /**
+     * Here is the node heartbeat sequence...
+     * 1. Check if it's a registered node
+     * 2. Check if it's a valid (i.e. not excluded) node 
+     * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat 
+     * 4. Send healthStatus to RMNode
+     */
+
+    NodeId nodeId = remoteNodeStatus.getNodeId();
+
+    // 1. Check if it's a registered node
+    RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
+    if (rmNode == null) {
+      /* node does not exist */
+      LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
       
-      NodeId nodeId = remoteNodeStatus.getNodeId();
-      
-      // 1. Check if it's a registered node
-      RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
-      if (rmNode == null) {
-        /* node does not exist */
-        LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
-        return reboot;
-      }
-
-      // Send ping
-      this.nmLivelinessMonitor.receivedPing(nodeId);
-
-      // 2. Check if it's a valid (i.e. not excluded) node
-      if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
-        LOG.info("Disallowed NodeManager nodeId: " + nodeId +  
-            " hostname: " + rmNode.getNodeAddress());
-        throw new IOException("Disallowed NodeManager nodeId: " + 
-            remoteNodeStatus.getNodeId());
-      }
-
-      NodeHeartbeatResponse nodeHeartBeatResponse = recordFactory
-          .newRecordInstance(NodeHeartbeatResponse.class);
-
-      // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
-      HeartbeatResponse lastHeartbeatResponse = rmNode
-          .getLastHeartBeatResponse();
-      if (remoteNodeStatus.getResponseId() + 1 == lastHeartbeatResponse
-           .getResponseId()) {
-        LOG.info("Received duplicate heartbeat from node " + 
-            rmNode.getNodeAddress());
-        nodeHeartBeatResponse.setHeartbeatResponse(lastHeartbeatResponse);
-        return nodeHeartBeatResponse;
-      } else if (remoteNodeStatus.getResponseId() + 1 < lastHeartbeatResponse
-          .getResponseId()) {
-        LOG.info("Too far behind rm response id:" +
-            lastHeartbeatResponse.getResponseId() + " nm response id:"
-            + remoteNodeStatus.getResponseId());
-        // TODO: Just sending reboot is not enough. Think more.
-        this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
-        return reboot;
-      }
-
-      // Heartbeat response
-      HeartbeatResponse latestResponse = recordFactory
-          .newRecordInstance(HeartbeatResponse.class);
-      latestResponse
-          .setResponseId(lastHeartbeatResponse.getResponseId() + 1);
-      latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
-      latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
+      // Updating the metrics directly as reboot event cannot be 
+      // triggered on a null rmNode
+      ClusterMetrics.getMetrics().incrNumRebootedNMs();
+      return reboot;
+    }
+
+    // Send ping
+    this.nmLivelinessMonitor.receivedPing(nodeId);
 
-      // 4. Send status to RMNode, saving the latest response.
+    // 2. Check if it's a valid (i.e. not excluded) node
+    if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
+      LOG.info("Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+          + rmNode.getNodeAddress());
       this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
-              remoteNodeStatus.getContainersStatuses(), latestResponse));
+          new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+      return shutDown;
+    }
+
+    NodeHeartbeatResponse nodeHeartBeatResponse = recordFactory
+        .newRecordInstance(NodeHeartbeatResponse.class);
 
-      nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
+    // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
+    HeartbeatResponse lastHeartbeatResponse = rmNode.getLastHeartBeatResponse();
+    if (remoteNodeStatus.getResponseId() + 1 == lastHeartbeatResponse
+        .getResponseId()) {
+      LOG.info("Received duplicate heartbeat from node "
+          + rmNode.getNodeAddress());
+      nodeHeartBeatResponse.setHeartbeatResponse(lastHeartbeatResponse);
       return nodeHeartBeatResponse;
-    } catch (IOException ioe) {
-      LOG.info("Exception in heartbeat from node " + 
-          request.getNodeStatus().getNodeId(), ioe);
-      throw RPCUtil.getRemoteException(ioe);
+    } else if (remoteNodeStatus.getResponseId() + 1 < lastHeartbeatResponse
+        .getResponseId()) {
+      LOG.info("Too far behind rm response id:"
+          + lastHeartbeatResponse.getResponseId() + " nm response id:"
+          + remoteNodeStatus.getResponseId());
+      // TODO: Just sending reboot is not enough. Think more.
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
+      return reboot;
     }
+
+    // Heartbeat response
+    HeartbeatResponse latestResponse = recordFactory
+        .newRecordInstance(HeartbeatResponse.class);
+    latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
+    latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
+    latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
+    latestResponse.setNodeAction(NodeAction.NORMAL);
+
+    // 4. Send status to RMNode, saving the latest response.
+    this.rmContext.getDispatcher().getEventHandler().handle(
+        new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
+            remoteNodeStatus.getContainersStatuses(), latestResponse));
+
+    nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
+    return nodeHeartBeatResponse;
   }
 
   public void recover(RMState state) {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java Fri Oct 28 17:40:45 2011
@@ -19,6 +19,9 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
 public enum RMNodeEventType {
+  
+  STARTED,
+  
   // Source: AdminService
   DECOMMISSION,
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Fri Oct 28 17:40:45 2011
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -107,9 +108,11 @@ public class RMNodeImpl implements RMNod
                  = new StateMachineFactory<RMNodeImpl,
                                            RMNodeState,
                                            RMNodeEventType,
-                                           RMNodeEvent>(RMNodeState.RUNNING)
+                                           RMNodeEvent>(RMNodeState.NEW)
   
      //Transitions from RUNNING state
+     .addTransition(RMNodeState.NEW, RMNodeState.RUNNING, 
+         RMNodeEventType.STARTED, new AddNodeTransition())
      .addTransition(RMNodeState.RUNNING, 
          EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY),
          RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
@@ -158,8 +161,6 @@ public class RMNodeImpl implements RMNod
 
     this.stateMachine = stateMachineFactory.make(this);
     
-    context.getDispatcher().getEventHandler().handle(
-        new NodeAddedSchedulerEvent(this));
   }
 
   @Override
@@ -311,6 +312,21 @@ public class RMNodeImpl implements RMNod
     }
   }
   
+  public static class AddNodeTransition implements
+      SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+      // Inform the scheduler
+
+      rmNode.context.getDispatcher().getEventHandler().handle(
+          new NodeAddedSchedulerEvent(rmNode));
+
+      ClusterMetrics.getMetrics().addNode();
+    }
+  }
+  
   public static class CleanUpAppTransition
     implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
@@ -335,6 +351,7 @@ public class RMNodeImpl implements RMNod
   public static class RemoveNodeTransition
     implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
+    @SuppressWarnings("unchecked")
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
       // Inform the scheduler
@@ -345,11 +362,14 @@ public class RMNodeImpl implements RMNod
       rmNode.context.getRMNodes().remove(rmNode.nodeId);
       LOG.info("Removed Node " + rmNode.nodeId);
       
+      //Update the metrics 
+      ClusterMetrics.getMetrics().removeNode(event.getType());
     }
   }
 
   public static class StatusUpdateWhenHealthyTransition implements
       MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
+    @SuppressWarnings("unchecked")
     @Override
     public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
 
@@ -365,6 +385,7 @@ public class RMNodeImpl implements RMNod
         // Inform the scheduler
         rmNode.context.getDispatcher().getEventHandler().handle(
             new NodeRemovedSchedulerEvent(rmNode));
+        ClusterMetrics.getMetrics().incrNumUnhealthyNMs();
         return RMNodeState.UNHEALTHY;
       }
 
@@ -402,6 +423,7 @@ public class RMNodeImpl implements RMNod
  implements
       MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
 
+    @SuppressWarnings("unchecked")
     @Override
     public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
       RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
@@ -413,6 +435,7 @@ public class RMNodeImpl implements RMNod
       if (remoteNodeHealthStatus.getIsNodeHealthy()) {
         rmNode.context.getDispatcher().getEventHandler().handle(
             new NodeAddedSchedulerEvent(rmNode));
+        ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
         return RMNodeState.RUNNING;
       }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java Fri Oct 28 17:40:45 2011
@@ -19,5 +19,5 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
 public enum RMNodeState {
-  RUNNING, UNHEALTHY, DECOMMISSIONED, LOST
+  NEW, RUNNING, UNHEALTHY, DECOMMISSIONED, LOST
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java Fri Oct 28 17:40:45 2011
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentMa
 
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -60,6 +61,7 @@ public class MetricsOverviewTable extend
     
     ResourceScheduler rs = rm.getResourceScheduler();
     QueueMetrics metrics = rs.getRootQueueMetrics();
+    ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
     
     int appsSubmitted = metrics.getAppsSubmitted();
     int reservedGB = metrics.getReservedGB();
@@ -67,30 +69,13 @@ public class MetricsOverviewTable extend
     int allocatedGB = metrics.getAllocatedGB();
     int containersAllocated = metrics.getAllocatedContainers();
     int totalGB = availableGB + reservedGB + allocatedGB;
-    
-    ConcurrentMap<NodeId,RMNode> nodes = rmContext.getRMNodes();
-    int totalNodes = nodes.size();
-    int lostNodes = 0;
-    int unhealthyNodes = 0;
-    int decommissionedNodes = 0;
-    for(RMNode node: nodes.values()) {
-      if(node == null || node.getState() == null) {
-        lostNodes++;
-        continue;
-      }
-      switch(node.getState()) {
-      case DECOMMISSIONED:
-        decommissionedNodes++;
-        break;
-      case LOST:
-        lostNodes++;
-        break;
-      case UNHEALTHY:
-        unhealthyNodes++;
-        break;
-      //RUNNING noop
-      }
-    }
+
+    int totalNodes = clusterMetrics.getNumNMs();
+    int lostNodes = clusterMetrics.getNumLostNMs();
+    int unhealthyNodes = clusterMetrics.getUnhealthyNMs();
+    int decommissionedNodes = clusterMetrics.getNumDecommisionedNMs();
+    int rebootedNodes = clusterMetrics.getNumRebootedNMs();
+
     
     DIV<Hamlet> div = html.div().$class("metrics");
     
@@ -106,6 +91,7 @@ public class MetricsOverviewTable extend
         th().$class("ui-state-default")._("Decommissioned Nodes")._().
         th().$class("ui-state-default")._("Lost Nodes")._().
         th().$class("ui-state-default")._("Unhealthy Nodes")._().
+        th().$class("ui-state-default")._("Rebooted Nodes")._().
       _().
     _().
     tbody().$class("ui-widget-content").
@@ -116,9 +102,10 @@ public class MetricsOverviewTable extend
         td(StringUtils.byteDesc(totalGB * BYTES_IN_GB)).
         td(StringUtils.byteDesc(reservedGB * BYTES_IN_GB)).
         td().a(url("nodes"),String.valueOf(totalNodes))._(). 
-        td().a(url("nodes/DECOMMISSIONED"),String.valueOf(decommissionedNodes))._(). 
-        td().a(url("nodes/LOST"),String.valueOf(lostNodes))._().
-        td().a(url("nodes/UNHEALTHY"),String.valueOf(unhealthyNodes))._().
+        td().a(url("nodes/decommissioned"),String.valueOf(decommissionedNodes))._(). 
+        td().a(url("nodes/lost"),String.valueOf(lostNodes))._().
+        td().a(url("nodes/unhealthy"),String.valueOf(unhealthyNodes))._().
+        td().a(url("nodes/rebooted"),String.valueOf(rebootedNodes))._().
       _().
     _()._();
     

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Fri Oct 28 17:40:45 2011
@@ -63,7 +63,7 @@ public class MockNM {
         new HashMap<ApplicationId, List<ContainerStatus>>();
     conts.put(container.getId().getApplicationAttemptId().getApplicationId(), 
         Arrays.asList(new ContainerStatus[] { container.getContainerStatus() }));
-    nodeHeartbeat(conts, true);
+    nodeHeartbeat(conts, true,nodeId);
   }
 
   public NodeId registerNode() throws Exception {
@@ -83,11 +83,11 @@ public class MockNM {
   }
 
   public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
-    return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b);
+    return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b,nodeId);
   }
 
   public HeartbeatResponse nodeHeartbeat(Map<ApplicationId, 
-      List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
+      List<ContainerStatus>> conts, boolean isHealthy, NodeId nodeId) throws Exception {
     NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
     NodeStatus status = Records.newRecord(NodeStatus.class);
     status.setNodeId(nodeId);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Fri Oct 28 17:40:45 2011
@@ -220,6 +220,10 @@ public class MockRM extends ResourceMana
       }
     };
   }
+  
+  public NodesListManager getNodesListManager() {
+    return this.nodesListManager;
+  }
 
   @Override
   protected void startWepApp() {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Fri Oct 28 17:40:45 2011
@@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java Fri Oct 28 17:40:45 2011
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
@@ -34,12 +32,13 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@@ -55,8 +54,6 @@ public class TestNMExpiry {
   ResourceTrackerService resourceTrackerService;
   ContainerTokenSecretManager containerTokenSecretManager = 
     new ContainerTokenSecretManager();
-  AtomicInteger test = new AtomicInteger();
-  AtomicInteger notify = new AtomicInteger();
 
   private class TestNmLivelinessMonitor extends NMLivelinessMonitor {
     public TestNmLivelinessMonitor(Dispatcher dispatcher) {
@@ -68,22 +65,6 @@ public class TestNMExpiry {
       conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1000);
       super.init(conf);
     }
-    @Override
-    protected void expire(NodeId id) {
-        LOG.info("Expired  " + id);
-        if (test.addAndGet(1) == 2) {
-          try {
-            /* delay atleast 2 seconds to make sure the 3rd one does not expire
-             * 
-             */
-            Thread.sleep(2000);
-          } catch(InterruptedException ie){}
-          synchronized(notify) {
-            notify.addAndGet(1);
-            notify.notifyAll();
-          }
-        }
-    }
   }
 
   @Before
@@ -91,12 +72,12 @@ public class TestNMExpiry {
     Configuration conf = new Configuration();
     // Dispatcher that processes events inline
     Dispatcher dispatcher = new InlineDispatcher();
+    RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
+        null, null);
     dispatcher.register(SchedulerEventType.class,
         new InlineDispatcher.EmptyEventHandler());
     dispatcher.register(RMNodeEventType.class,
-        new InlineDispatcher.EmptyEventHandler());
-    RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
-        null, null);
+        new NodeEventDispatcher(context));
     NMLivelinessMonitor nmLivelinessMonitor = new TestNmLivelinessMonitor(
         dispatcher);
     nmLivelinessMonitor.init(conf);
@@ -166,6 +147,14 @@ public class TestNMExpiry {
     request2.setHttpPort(0);
     request2.setResource(capability);
     resourceTrackerService.registerNodeManager(request2);
+    
+    int waitCount = 0;
+    while(ClusterMetrics.getMetrics().getNumLostNMs()!=2 && waitCount ++<20){
+      synchronized (this) {
+        wait(100);
+      }
+    }
+    Assert.assertEquals(2, ClusterMetrics.getMetrics().getNumLostNMs());
 
     request3 = recordFactory
         .newRecordInstance(RegisterNodeManagerRequest.class);
@@ -175,20 +164,13 @@ public class TestNMExpiry {
     request3.setNodeId(nodeId3);
     request3.setHttpPort(0);
     request3.setResource(capability);
-    RegistrationResponse thirdNodeRegResponse = resourceTrackerService
+    resourceTrackerService
         .registerNodeManager(request3).getRegistrationResponse();
 
     /* test to see if hostanme 3 does not expire */
     stopT = false;
     new ThirdNodeHeartBeatThread().start();
-    int timeOut = 0;
-    synchronized (notify) {
-      while (notify.get() == 0 && timeOut++ < 30) {
-        notify.wait(1000);
-      }
-    }
-    Assert.assertEquals(2, test.get()); 
-
+    Assert.assertEquals(2,ClusterMetrics.getMetrics().getNumLostNMs());
     stopT = true;
   }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java Fri Oct 28 17:40:45 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -130,6 +131,6 @@ public class TestRMNMRPCResponseId {
     nodeStatus.setResponseId(0);
     response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest)
         .getHeartbeatResponse();
-    Assert.assertTrue(response.getReboot() == true);
+    Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction()));
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java Fri Oct 28 17:40:45 2011
@@ -43,7 +43,7 @@ public class TestNodesPage {
     final int numberOfNodesPerRack = 2;
     // Number of Actual Table Headers for NodesPage.NodesBlock might change in
     // future. In that case this value should be adjusted to the new value.
-    final int numberOfThInMetricsTable = 9;
+    final int numberOfThInMetricsTable = 10;
     final int numberOfActualTableHeaders = 10;
 
     Injector injector = WebAppTests.createMockInjector(RMContext.class,