You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/28 19:40:46 UTC
svn commit: r1190468 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-serv...
Author: vinodkv
Date: Fri Oct 28 17:40:45 2011
New Revision: 1190468
URL: http://svn.apache.org/viewvc?rev=1190468&view=rev
Log:
MAPREDUCE-2775. Fixed ResourceManager and NodeManager to force a decommissioned node to shutdown. Contributed by Devaraj K.
svn merge -c r1190467 --ignore-ancestry ../../trunk/
Added:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java
- copied unchanged from r1190467, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java
- copied unchanged from r1190467, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
- copied unchanged from r1190467, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Oct 28 17:40:45 2011
@@ -1786,6 +1786,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3296. Fixed the remaining nine FindBugs warnings. (vinodkv)
+ MAPREDUCE-2775. Fixed ResourceManager and NodeManager to force a
+ decommissioned node to shutdown. (Devaraj K via vinodkv)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java Fri Oct 28 17:40:45 2011
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.record
public interface HeartbeatResponse {
int getResponseId();
- boolean getReboot();
+ NodeAction getNodeAction();
List<ContainerId> getContainersToCleanupList();
ContainerId getContainerToCleanup(int index);
@@ -35,7 +35,7 @@ public interface HeartbeatResponse {
int getApplicationsToCleanupCount();
void setResponseId(int responseId);
- void setReboot(boolean reboot);
+ void setNodeAction(NodeAction action);
void addAllContainersToCleanup(List<ContainerId> containers);
void addContainerToCleanup(ContainerId container);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java Fri Oct 28 17:40:45 2011
@@ -23,4 +23,8 @@ public interface RegistrationResponse {
public abstract ByteBuffer getSecretKey();
public abstract void setSecretKey(ByteBuffer secretKey);
+
+ public abstract NodeAction getNodeAction();
+
+ public abstract void setNodeAction(NodeAction nodeAction);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java Fri Oct 28 17:40:45 2011
@@ -32,11 +32,12 @@ import org.apache.hadoop.yarn.proto.Yarn
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
-
-
-public class HeartbeatResponsePBImpl extends ProtoBase<HeartbeatResponseProto> implements HeartbeatResponse {
+public class HeartbeatResponsePBImpl extends
+ ProtoBase<HeartbeatResponseProto> implements HeartbeatResponse {
HeartbeatResponseProto proto = HeartbeatResponseProto.getDefaultInstance();
HeartbeatResponseProto.Builder builder = null;
boolean viaProto = false;
@@ -100,16 +101,24 @@ public class HeartbeatResponsePBImpl ext
builder.setResponseId((responseId));
}
@Override
- public boolean getReboot() {
+ public NodeAction getNodeAction() {
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
- return (p.getReboot());
+ if(!p.hasNodeAction()) {
+ return null;
+ }
+ return (convertFromProtoFormat(p.getNodeAction()));
}
@Override
- public void setReboot(boolean reboot) {
+ public void setNodeAction(NodeAction nodeAction) {
maybeInitBuilder();
- builder.setReboot((reboot));
+ if (nodeAction == null) {
+ builder.clearNodeAction();
+ return;
+ }
+ builder.setNodeAction(convertToProtoFormat(nodeAction));
}
+
@Override
public List<ContainerId> getContainersToCleanupList() {
initContainersToCleanup();
@@ -296,7 +305,12 @@ public class HeartbeatResponsePBImpl ext
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl)t).getProto();
}
-
-
-
+
+ private NodeAction convertFromProtoFormat(NodeActionProto p) {
+ return NodeAction.valueOf(p.name());
+ }
+
+ private NodeActionProto convertToProtoFormat(NodeAction t) {
+ return NodeActionProto.valueOf(t.name());
+ }
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java Fri Oct 28 17:40:45 2011
@@ -21,17 +21,15 @@ package org.apache.hadoop.yarn.server.ap
import java.nio.ByteBuffer;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
-
-
-public class RegistrationResponsePBImpl extends ProtoBase<RegistrationResponseProto> implements RegistrationResponse {
+public class RegistrationResponsePBImpl extends
+ ProtoBase<RegistrationResponseProto> implements RegistrationResponse {
RegistrationResponseProto proto = RegistrationResponseProto.getDefaultInstance();
RegistrationResponseProto.Builder builder = null;
boolean viaProto = false;
@@ -98,4 +96,31 @@ public class RegistrationResponsePBImpl
this.secretKey = secretKey;
}
+ @Override
+ public NodeAction getNodeAction() {
+ RegistrationResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if(!p.hasNodeAction()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getNodeAction());
+ }
+
+ @Override
+ public void setNodeAction(NodeAction nodeAction) {
+ maybeInitBuilder();
+ if (nodeAction == null) {
+ builder.clearNodeAction();
+ return;
+ }
+ builder.setNodeAction(convertToProtoFormat(nodeAction));
+ }
+
+ private NodeAction convertFromProtoFormat(NodeActionProto p) {
+ return NodeAction.valueOf(p.name());
+ }
+
+ private NodeActionProto convertToProtoFormat(NodeAction t) {
+ return NodeActionProto.valueOf(t.name());
+ }
+
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto Fri Oct 28 17:40:45 2011
@@ -23,6 +23,12 @@ option java_generate_equals_and_hash = t
import "yarn_protos.proto";
+enum NodeActionProto {
+ NORMAL = 0;
+ REBOOT = 1;
+ SHUTDOWN = 2;
+}
+
message NodeStatusProto {
optional NodeIdProto node_id = 1;
optional int32 response_id = 2;
@@ -32,11 +38,12 @@ message NodeStatusProto {
message RegistrationResponseProto {
optional bytes secret_key = 1;
+ optional NodeActionProto nodeAction = 2;
}
message HeartbeatResponseProto {
optional int32 response_id = 1;
- optional bool reboot = 2;
+ optional NodeActionProto nodeAction = 2;
repeated ContainerIdProto containers_to_cleanup = 3;
repeated ApplicationIdProto applications_to_cleanup = 4;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Fri Oct 28 17:40:45 2011
@@ -50,9 +50,11 @@ import org.apache.hadoop.yarn.server.sec
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
import org.apache.hadoop.yarn.util.Records;
-public class NodeManager extends CompositeService {
+public class NodeManager extends CompositeService implements
+ ServiceStateChangeListener {
private static final Log LOG = LogFactory.getLog(NodeManager.class);
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
protected ContainerTokenSecretManager containerTokenSecretManager;
@@ -123,6 +125,8 @@ public class NodeManager extends Composi
NodeStatusUpdater nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, healthChecker,
this.containerTokenSecretManager);
+
+ nodeStatusUpdater.register(this);
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
@@ -206,6 +210,16 @@ public class NodeManager extends Composi
}
}
+
+ @Override
+ public void stateChanged(Service service) {
+ // Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
+ if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
+ && STATE.STOPPED.equals(service.getServiceState())) {
+ stop();
+ }
+ }
+
public static void main(String[] args) {
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
try {
@@ -220,5 +234,4 @@ public class NodeManager extends Composi
System.exit(-1);
}
}
-
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Fri Oct 28 17:40:45 2011
@@ -30,8 +30,8 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -45,11 +45,11 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -160,6 +160,12 @@ public class NodeStatusUpdaterImpl exten
request.setNodeId(this.nodeId);
RegistrationResponse regResponse =
this.resourceTracker.registerNodeManager(request).getRegistrationResponse();
+ // if the Resourcemanager instructs NM to shutdown.
+ if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) {
+ throw new YarnException(
+ "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
+ }
+
if (UserGroupInformation.isSecurityEnabled()) {
this.secretKeyBytes = regResponse.getSecretKey().array();
}
@@ -248,10 +254,25 @@ public class NodeStatusUpdaterImpl exten
NodeStatus nodeStatus = getNodeStatus();
nodeStatus.setResponseId(lastHeartBeatID);
- NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
+ NodeHeartbeatRequest request = recordFactory
+ .newRecordInstance(NodeHeartbeatRequest.class);
request.setNodeStatus(nodeStatus);
HeartbeatResponse response =
resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
+ if (response.getNodeAction() == NodeAction.SHUTDOWN) {
+ LOG
+ .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
+ " hence shutting down.");
+ NodeStatusUpdaterImpl.this.stop();
+ break;
+ }
+ if (response.getNodeAction() == NodeAction.REBOOT) {
+ LOG.info("Node is out of sync with ResourceManager,"
+ + " hence shutting down.");
+ NodeStatusUpdaterImpl.this.stop();
+ break;
+ }
+
lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanupList();
@@ -269,7 +290,6 @@ public class NodeStatusUpdaterImpl exten
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
- break;
}
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Fri Oct 28 17:40:45 2011
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
@@ -85,10 +86,15 @@ public class TestNodeStatusUpdater {
volatile Error nmStartError = null;
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
private final Configuration conf = new YarnConfiguration();
+ private NodeManager nm;
@After
public void tearDown() {
this.registeredNodes.clear();
+ heartBeatID = 0;
+ if (nm != null) {
+ nm.stop();
+ }
DefaultMetricsSystem.shutdown();
}
@@ -220,6 +226,7 @@ public class TestNodeStatusUpdater {
}
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
+ public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
@@ -232,10 +239,44 @@ public class TestNodeStatusUpdater {
@Override
protected ResourceTracker getRMClient() {
- return new MyResourceTracker(this.context);
+ return resourceTracker;
}
}
+
+ //
+ private class MyResourceTracker2 implements ResourceTracker {
+ public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
+ public NodeAction registerNodeAction = NodeAction.NORMAL;
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnRemoteException {
+
+ RegisterNodeManagerResponse response = recordFactory
+ .newRecordInstance(RegisterNodeManagerResponse.class);
+ RegistrationResponse regResponse = recordFactory
+ .newRecordInstance(RegistrationResponse.class);
+ regResponse.setNodeAction(registerNodeAction );
+ response.setRegistrationResponse(regResponse);
+ return response;
+ }
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnRemoteException {
+ NodeStatus nodeStatus = request.getNodeStatus();
+ nodeStatus.setResponseId(heartBeatID++);
+ HeartbeatResponse response = recordFactory
+ .newRecordInstance(HeartbeatResponse.class);
+ response.setResponseId(heartBeatID);
+ response.setNodeAction(heartBeatNodeAction);
+
+ NodeHeartbeatResponse nhResponse = recordFactory
+ .newRecordInstance(NodeHeartbeatResponse.class);
+ nhResponse.setHeartbeatResponse(response);
+ return nhResponse;
+ }
+ }
+
@Before
public void clearError() {
nmStartError = null;
@@ -249,7 +290,7 @@ public class TestNodeStatusUpdater {
@Test
public void testNMRegistration() throws InterruptedException {
- final NodeManager nm = new NodeManager() {
+ nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
@@ -295,14 +336,85 @@ public class TestNodeStatusUpdater {
Assert.fail("NodeManager failed to start");
}
- while (heartBeatID <= 3) {
+ waitCount = 0;
+ while (heartBeatID <= 3 && waitCount++ != 20) {
Thread.sleep(500);
}
+ Assert.assertFalse(heartBeatID <= 3);
Assert.assertEquals("Number of registered NMs is wrong!!", 1,
this.registeredNodes.size());
nm.stop();
}
+
+ @Test
+ public void testNodeDecommision() throws Exception {
+ nm = getNodeManager(NodeAction.SHUTDOWN);
+ YarnConfiguration conf = createNMConfig();
+ nm.init(conf);
+ Assert.assertEquals(STATE.INITED, nm.getServiceState());
+ nm.start();
+
+ int waitCount = 0;
+ while (heartBeatID < 1 && waitCount++ != 20) {
+ Thread.sleep(500);
+ }
+ Assert.assertFalse(heartBeatID < 1);
+
+ // NM takes a while to reach the STOPPED state.
+ waitCount = 0;
+ while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
+ LOG.info("Waiting for NM to stop..");
+ Thread.sleep(1000);
+ }
+
+ Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
+ }
+
+ @Test
+ public void testNodeReboot() throws Exception {
+ nm = getNodeManager(NodeAction.REBOOT);
+ YarnConfiguration conf = createNMConfig();
+ nm.init(conf);
+ Assert.assertEquals(STATE.INITED, nm.getServiceState());
+ nm.start();
+
+ int waitCount = 0;
+ while (heartBeatID < 1 && waitCount++ != 20) {
+ Thread.sleep(500);
+ }
+ Assert.assertFalse(heartBeatID < 1);
+
+ // NM takes a while to reach the STOPPED state.
+ waitCount = 0;
+ while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
+ LOG.info("Waiting for NM to stop..");
+ Thread.sleep(1000);
+ }
+
+ Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
+ }
+
+ @Test
+ public void testNMShutdownForRegistrationFailure() {
+
+ nm = new NodeManager() {
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+ ContainerTokenSecretManager containerTokenSecretManager) {
+ MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater(
+ context, dispatcher, healthChecker, metrics,
+ containerTokenSecretManager);
+ MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
+ myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN;
+ nodeStatusUpdater.resourceTracker = myResourceTracker2;
+ return nodeStatusUpdater;
+ }
+ };
+ verifyNodeStartFailure("org.apache.hadoop.yarn.YarnException: "
+ + "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
+ }
/**
* Verifies that if for some reason NM fails to start ContainerManager RPC
@@ -314,7 +426,7 @@ public class TestNodeStatusUpdater {
@Test
public void testNoRegistrationWhenNMServicesFail() {
- final NodeManager nm = new NodeManager() {
+ nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
@@ -341,16 +453,22 @@ public class TestNodeStatusUpdater {
}
};
+ verifyNodeStartFailure("Starting of RPC Server failed");
+ }
+
+ private void verifyNodeStartFailure(String errMessage) {
YarnConfiguration conf = createNMConfig();
nm.init(conf);
try {
nm.start();
Assert.fail("NM should have failed to start. Didn't get exception!!");
} catch (Exception e) {
- Assert.assertEquals("Starting of RPC Server failed", e.getCause()
+ Assert.assertEquals(errMessage, e.getCause()
.getMessage());
}
-
+
+ // the state change to stopped occurs only if the startup is success, else
+ // state change doesn't occur
Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm
.getServiceState());
@@ -371,4 +489,21 @@ public class TestNodeStatusUpdater {
.toUri().getPath());
return conf;
}
+
+ private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) {
+ return new NodeManager() {
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+ ContainerTokenSecretManager containerTokenSecretManager) {
+ MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
+ context, dispatcher, healthChecker, metrics,
+ containerTokenSecretManager);
+ MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
+ myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction;
+ myNodeStatusUpdater.resourceTracker = myResourceTracker2;
+ return myNodeStatusUpdater;
+ }
+ };
+ }
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Fri Oct 28 17:40:45 2011
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -37,7 +36,6 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@@ -45,6 +43,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
@@ -76,11 +75,19 @@ public class ResourceTrackerService exte
private static final NodeHeartbeatResponse reboot = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
+ private static final NodeHeartbeatResponse shutDown = recordFactory
+ .newRecordInstance(NodeHeartbeatResponse.class);
+
static {
HeartbeatResponse rebootResp = recordFactory
.newRecordInstance(HeartbeatResponse.class);
- rebootResp.setReboot(true);
+ rebootResp.setNodeAction(NodeAction.REBOOT);
reboot.setHeartbeatResponse(rebootResp);
+
+ HeartbeatResponse decommissionedResp = recordFactory
+ .newRecordInstance(HeartbeatResponse.class);
+ decommissionedResp.setNodeAction(NodeAction.SHUTDOWN);
+ shutDown.setHeartbeatResponse(decommissionedResp);
}
public ResourceTrackerService(RMContext rmContext,
@@ -139,6 +146,7 @@ public class ResourceTrackerService exte
super.stop();
}
+ @SuppressWarnings("unchecked")
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException {
@@ -149,121 +157,125 @@ public class ResourceTrackerService exte
int httpPort = request.getHttpPort();
Resource capability = request.getResource();
- try {
- // Check if this node is a 'valid' node
- if (!this.nodesListManager.isValidNode(host)) {
- LOG.info("Disallowed NodeManager from " + host);
- throw new IOException("Disallowed NodeManager from " + host);
- }
-
- RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort,
- httpPort, resolve(host), capability);
-
- if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
- throw new IOException("Duplicate registration from the node!");
- }
-
- this.nmLivelinessMonitor.register(nodeId);
-
- LOG.info("NodeManager from node " + host +
- "(cmPort: " + cmPort + " httpPort: " + httpPort + ") "
- + "registered with capability: " + capability.getMemory()
- + ", assigned nodeId " + nodeId);
-
- RegistrationResponse regResponse = recordFactory.newRecordInstance(
- RegistrationResponse.class);
- SecretKey secretKey = this.containerTokenSecretManager
- .createAndGetSecretKey(nodeId.toString());
- regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded()));
+ RegisterNodeManagerResponse response = recordFactory
+ .newRecordInstance(RegisterNodeManagerResponse.class);
+ RegistrationResponse regResponse = recordFactory
+ .newRecordInstance(RegistrationResponse.class);
+ SecretKey secretKey = this.containerTokenSecretManager
+ .createAndGetSecretKey(nodeId.toString());
+ regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded()));
+
+ // Check if this node is a 'valid' node
+ if (!this.nodesListManager.isValidNode(host)) {
+ LOG.info("Disallowed NodeManager from " + host
+ + ", Sending SHUTDOWN signal to the NodeManager.");
+ regResponse.setNodeAction(NodeAction.SHUTDOWN);
+ response.setRegistrationResponse(regResponse);
+ return response;
+ }
+
+ RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
+ resolve(host), capability);
- RegisterNodeManagerResponse response = recordFactory
- .newRecordInstance(RegisterNodeManagerResponse.class);
+ if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
+ LOG.info("Duplicate registration from the node at: " + host
+ + ", Sending SHUTDOWN Signal to the NodeManager");
+ regResponse.setNodeAction(NodeAction.SHUTDOWN);
response.setRegistrationResponse(regResponse);
return response;
- } catch (IOException ioe) {
- LOG.info("Exception in node registration from " + nodeId.getHost(), ioe);
- throw RPCUtil.getRemoteException(ioe);
}
+
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
+
+ this.nmLivelinessMonitor.register(nodeId);
+
+ LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort
+ + " httpPort: " + httpPort + ") " + "registered with capability: "
+ + capability.getMemory() + ", assigned nodeId " + nodeId);
+
+ regResponse.setNodeAction(NodeAction.NORMAL);
+ response.setRegistrationResponse(regResponse);
+ return response;
}
+ @SuppressWarnings("unchecked")
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException {
NodeStatus remoteNodeStatus = request.getNodeStatus();
- try {
- /**
- * Here is the node heartbeat sequence...
- * 1. Check if it's a registered node
- * 2. Check if it's a valid (i.e. not excluded) node
- * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
- * 4. Send healthStatus to RMNode
- */
+ /**
+ * Here is the node heartbeat sequence...
+ * 1. Check if it's a registered node
+ * 2. Check if it's a valid (i.e. not excluded) node
+ * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
+ * 4. Send healthStatus to RMNode
+ */
+
+ NodeId nodeId = remoteNodeStatus.getNodeId();
+
+ // 1. Check if it's a registered node
+ RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
+ if (rmNode == null) {
+ /* node does not exist */
+ LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
- NodeId nodeId = remoteNodeStatus.getNodeId();
-
- // 1. Check if it's a registered node
- RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
- if (rmNode == null) {
- /* node does not exist */
- LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
- return reboot;
- }
-
- // Send ping
- this.nmLivelinessMonitor.receivedPing(nodeId);
-
- // 2. Check if it's a valid (i.e. not excluded) node
- if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
- LOG.info("Disallowed NodeManager nodeId: " + nodeId +
- " hostname: " + rmNode.getNodeAddress());
- throw new IOException("Disallowed NodeManager nodeId: " +
- remoteNodeStatus.getNodeId());
- }
-
- NodeHeartbeatResponse nodeHeartBeatResponse = recordFactory
- .newRecordInstance(NodeHeartbeatResponse.class);
-
- // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
- HeartbeatResponse lastHeartbeatResponse = rmNode
- .getLastHeartBeatResponse();
- if (remoteNodeStatus.getResponseId() + 1 == lastHeartbeatResponse
- .getResponseId()) {
- LOG.info("Received duplicate heartbeat from node " +
- rmNode.getNodeAddress());
- nodeHeartBeatResponse.setHeartbeatResponse(lastHeartbeatResponse);
- return nodeHeartBeatResponse;
- } else if (remoteNodeStatus.getResponseId() + 1 < lastHeartbeatResponse
- .getResponseId()) {
- LOG.info("Too far behind rm response id:" +
- lastHeartbeatResponse.getResponseId() + " nm response id:"
- + remoteNodeStatus.getResponseId());
- // TODO: Just sending reboot is not enough. Think more.
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
- return reboot;
- }
-
- // Heartbeat response
- HeartbeatResponse latestResponse = recordFactory
- .newRecordInstance(HeartbeatResponse.class);
- latestResponse
- .setResponseId(lastHeartbeatResponse.getResponseId() + 1);
- latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
- latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
+ // Updating the metrics directly as reboot event cannot be
+ // triggered on a null rmNode
+ ClusterMetrics.getMetrics().incrNumRebootedNMs();
+ return reboot;
+ }
+
+ // Send ping
+ this.nmLivelinessMonitor.receivedPing(nodeId);
- // 4. Send status to RMNode, saving the latest response.
+ // 2. Check if it's a valid (i.e. not excluded) node
+ if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
+ LOG.info("Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ + rmNode.getNodeAddress());
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
- remoteNodeStatus.getContainersStatuses(), latestResponse));
+ new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+ return shutDown;
+ }
+
+ NodeHeartbeatResponse nodeHeartBeatResponse = recordFactory
+ .newRecordInstance(NodeHeartbeatResponse.class);
- nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
+ // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
+ HeartbeatResponse lastHeartbeatResponse = rmNode.getLastHeartBeatResponse();
+ if (remoteNodeStatus.getResponseId() + 1 == lastHeartbeatResponse
+ .getResponseId()) {
+ LOG.info("Received duplicate heartbeat from node "
+ + rmNode.getNodeAddress());
+ nodeHeartBeatResponse.setHeartbeatResponse(lastHeartbeatResponse);
return nodeHeartBeatResponse;
- } catch (IOException ioe) {
- LOG.info("Exception in heartbeat from node " +
- request.getNodeStatus().getNodeId(), ioe);
- throw RPCUtil.getRemoteException(ioe);
+ } else if (remoteNodeStatus.getResponseId() + 1 < lastHeartbeatResponse
+ .getResponseId()) {
+ LOG.info("Too far behind rm response id:"
+ + lastHeartbeatResponse.getResponseId() + " nm response id:"
+ + remoteNodeStatus.getResponseId());
+ // TODO: Just sending reboot is not enough. Think more.
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
+ return reboot;
}
+
+ // Heartbeat response
+ HeartbeatResponse latestResponse = recordFactory
+ .newRecordInstance(HeartbeatResponse.class);
+ latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
+ latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
+ latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
+ latestResponse.setNodeAction(NodeAction.NORMAL);
+
+ // 4. Send status to RMNode, saving the latest response.
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
+ remoteNodeStatus.getContainersStatuses(), latestResponse));
+
+ nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
+ return nodeHeartBeatResponse;
}
public void recover(RMState state) {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java Fri Oct 28 17:40:45 2011
@@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
public enum RMNodeEventType {
+
+ STARTED,
+
// Source: AdminService
DECOMMISSION,
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Fri Oct 28 17:40:45 2011
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.Even
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -107,9 +108,11 @@ public class RMNodeImpl implements RMNod
= new StateMachineFactory<RMNodeImpl,
RMNodeState,
RMNodeEventType,
- RMNodeEvent>(RMNodeState.RUNNING)
+ RMNodeEvent>(RMNodeState.NEW)
//Transitions from RUNNING state
+ .addTransition(RMNodeState.NEW, RMNodeState.RUNNING,
+ RMNodeEventType.STARTED, new AddNodeTransition())
.addTransition(RMNodeState.RUNNING,
EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
@@ -158,8 +161,6 @@ public class RMNodeImpl implements RMNod
this.stateMachine = stateMachineFactory.make(this);
- context.getDispatcher().getEventHandler().handle(
- new NodeAddedSchedulerEvent(this));
}
@Override
@@ -311,6 +312,21 @@ public class RMNodeImpl implements RMNod
}
}
+ public static class AddNodeTransition implements
+ SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+ // Inform the scheduler
+
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodeAddedSchedulerEvent(rmNode));
+
+ ClusterMetrics.getMetrics().addNode();
+ }
+ }
+
public static class CleanUpAppTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@@ -335,6 +351,7 @@ public class RMNodeImpl implements RMNod
public static class RemoveNodeTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+ @SuppressWarnings("unchecked")
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
@@ -345,11 +362,14 @@ public class RMNodeImpl implements RMNod
rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Removed Node " + rmNode.nodeId);
+ //Update the metrics
+ ClusterMetrics.getMetrics().removeNode(event.getType());
}
}
public static class StatusUpdateWhenHealthyTransition implements
MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
+ @SuppressWarnings("unchecked")
@Override
public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
@@ -365,6 +385,7 @@ public class RMNodeImpl implements RMNod
// Inform the scheduler
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
+ ClusterMetrics.getMetrics().incrNumUnhealthyNMs();
return RMNodeState.UNHEALTHY;
}
@@ -402,6 +423,7 @@ public class RMNodeImpl implements RMNod
implements
MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
+ @SuppressWarnings("unchecked")
@Override
public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
@@ -413,6 +435,7 @@ public class RMNodeImpl implements RMNod
if (remoteNodeHealthStatus.getIsNodeHealthy()) {
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
+ ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
return RMNodeState.RUNNING;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java Fri Oct 28 17:40:45 2011
@@ -19,5 +19,5 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
public enum RMNodeState {
- RUNNING, UNHEALTHY, DECOMMISSIONED, LOST
+ NEW, RUNNING, UNHEALTHY, DECOMMISSIONED, LOST
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java Fri Oct 28 17:40:45 2011
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentMa
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -60,6 +61,7 @@ public class MetricsOverviewTable extend
ResourceScheduler rs = rm.getResourceScheduler();
QueueMetrics metrics = rs.getRootQueueMetrics();
+ ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
int appsSubmitted = metrics.getAppsSubmitted();
int reservedGB = metrics.getReservedGB();
@@ -67,30 +69,13 @@ public class MetricsOverviewTable extend
int allocatedGB = metrics.getAllocatedGB();
int containersAllocated = metrics.getAllocatedContainers();
int totalGB = availableGB + reservedGB + allocatedGB;
-
- ConcurrentMap<NodeId,RMNode> nodes = rmContext.getRMNodes();
- int totalNodes = nodes.size();
- int lostNodes = 0;
- int unhealthyNodes = 0;
- int decommissionedNodes = 0;
- for(RMNode node: nodes.values()) {
- if(node == null || node.getState() == null) {
- lostNodes++;
- continue;
- }
- switch(node.getState()) {
- case DECOMMISSIONED:
- decommissionedNodes++;
- break;
- case LOST:
- lostNodes++;
- break;
- case UNHEALTHY:
- unhealthyNodes++;
- break;
- //RUNNING noop
- }
- }
+
+ int totalNodes = clusterMetrics.getNumNMs();
+ int lostNodes = clusterMetrics.getNumLostNMs();
+ int unhealthyNodes = clusterMetrics.getUnhealthyNMs();
+ int decommissionedNodes = clusterMetrics.getNumDecommisionedNMs();
+ int rebootedNodes = clusterMetrics.getNumRebootedNMs();
+
DIV<Hamlet> div = html.div().$class("metrics");
@@ -106,6 +91,7 @@ public class MetricsOverviewTable extend
th().$class("ui-state-default")._("Decommissioned Nodes")._().
th().$class("ui-state-default")._("Lost Nodes")._().
th().$class("ui-state-default")._("Unhealthy Nodes")._().
+ th().$class("ui-state-default")._("Rebooted Nodes")._().
_().
_().
tbody().$class("ui-widget-content").
@@ -116,9 +102,10 @@ public class MetricsOverviewTable extend
td(StringUtils.byteDesc(totalGB * BYTES_IN_GB)).
td(StringUtils.byteDesc(reservedGB * BYTES_IN_GB)).
td().a(url("nodes"),String.valueOf(totalNodes))._().
- td().a(url("nodes/DECOMMISSIONED"),String.valueOf(decommissionedNodes))._().
- td().a(url("nodes/LOST"),String.valueOf(lostNodes))._().
- td().a(url("nodes/UNHEALTHY"),String.valueOf(unhealthyNodes))._().
+ td().a(url("nodes/decommissioned"),String.valueOf(decommissionedNodes))._().
+ td().a(url("nodes/lost"),String.valueOf(lostNodes))._().
+ td().a(url("nodes/unhealthy"),String.valueOf(unhealthyNodes))._().
+ td().a(url("nodes/rebooted"),String.valueOf(rebootedNodes))._().
_().
_()._();
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Fri Oct 28 17:40:45 2011
@@ -63,7 +63,7 @@ public class MockNM {
new HashMap<ApplicationId, List<ContainerStatus>>();
conts.put(container.getId().getApplicationAttemptId().getApplicationId(),
Arrays.asList(new ContainerStatus[] { container.getContainerStatus() }));
- nodeHeartbeat(conts, true);
+ nodeHeartbeat(conts, true,nodeId);
}
public NodeId registerNode() throws Exception {
@@ -83,11 +83,11 @@ public class MockNM {
}
public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
- return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b);
+ return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b,nodeId);
}
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
- List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
+ List<ContainerStatus>> conts, boolean isHealthy, NodeId nodeId) throws Exception {
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus status = Records.newRecord(NodeStatus.class);
status.setNodeId(nodeId);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Fri Oct 28 17:40:45 2011
@@ -220,6 +220,10 @@ public class MockRM extends ResourceMana
}
};
}
+
+ public NodesListManager getNodesListManager() {
+ return this.nodesListManager;
+ }
@Override
protected void startWepApp() {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Fri Oct 28 17:40:45 2011
@@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java Fri Oct 28 17:40:45 2011
@@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
-import java.util.concurrent.atomic.AtomicInteger;
-
import junit.framework.Assert;
import org.apache.commons.logging.Log;
@@ -34,12 +32,13 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@@ -55,8 +54,6 @@ public class TestNMExpiry {
ResourceTrackerService resourceTrackerService;
ContainerTokenSecretManager containerTokenSecretManager =
new ContainerTokenSecretManager();
- AtomicInteger test = new AtomicInteger();
- AtomicInteger notify = new AtomicInteger();
private class TestNmLivelinessMonitor extends NMLivelinessMonitor {
public TestNmLivelinessMonitor(Dispatcher dispatcher) {
@@ -68,22 +65,6 @@ public class TestNMExpiry {
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1000);
super.init(conf);
}
- @Override
- protected void expire(NodeId id) {
- LOG.info("Expired " + id);
- if (test.addAndGet(1) == 2) {
- try {
- /* delay atleast 2 seconds to make sure the 3rd one does not expire
- *
- */
- Thread.sleep(2000);
- } catch(InterruptedException ie){}
- synchronized(notify) {
- notify.addAndGet(1);
- notify.notifyAll();
- }
- }
- }
}
@Before
@@ -91,12 +72,12 @@ public class TestNMExpiry {
Configuration conf = new Configuration();
// Dispatcher that processes events inline
Dispatcher dispatcher = new InlineDispatcher();
+ RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
+ null, null);
dispatcher.register(SchedulerEventType.class,
new InlineDispatcher.EmptyEventHandler());
dispatcher.register(RMNodeEventType.class,
- new InlineDispatcher.EmptyEventHandler());
- RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
- null, null);
+ new NodeEventDispatcher(context));
NMLivelinessMonitor nmLivelinessMonitor = new TestNmLivelinessMonitor(
dispatcher);
nmLivelinessMonitor.init(conf);
@@ -166,6 +147,14 @@ public class TestNMExpiry {
request2.setHttpPort(0);
request2.setResource(capability);
resourceTrackerService.registerNodeManager(request2);
+
+ int waitCount = 0;
+ while(ClusterMetrics.getMetrics().getNumLostNMs()!=2 && waitCount ++<20){
+ synchronized (this) {
+ wait(100);
+ }
+ }
+ Assert.assertEquals(2, ClusterMetrics.getMetrics().getNumLostNMs());
request3 = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class);
@@ -175,20 +164,13 @@ public class TestNMExpiry {
request3.setNodeId(nodeId3);
request3.setHttpPort(0);
request3.setResource(capability);
- RegistrationResponse thirdNodeRegResponse = resourceTrackerService
+ resourceTrackerService
.registerNodeManager(request3).getRegistrationResponse();
/* test to see if hostanme 3 does not expire */
stopT = false;
new ThirdNodeHeartBeatThread().start();
- int timeOut = 0;
- synchronized (notify) {
- while (notify.get() == 0 && timeOut++ < 30) {
- notify.wait(1000);
- }
- }
- Assert.assertEquals(2, test.get());
-
+ Assert.assertEquals(2,ClusterMetrics.getMetrics().getNumLostNMs());
stopT = true;
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java Fri Oct 28 17:40:45 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -130,6 +131,6 @@ public class TestRMNMRPCResponseId {
nodeStatus.setResponseId(0);
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest)
.getHeartbeatResponse();
- Assert.assertTrue(response.getReboot() == true);
+ Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction()));
}
}
\ No newline at end of file
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java?rev=1190468&r1=1190467&r2=1190468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java Fri Oct 28 17:40:45 2011
@@ -43,7 +43,7 @@ public class TestNodesPage {
final int numberOfNodesPerRack = 2;
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.
- final int numberOfThInMetricsTable = 9;
+ final int numberOfThInMetricsTable = 10;
final int numberOfActualTableHeaders = 10;
Injector injector = WebAppTests.createMockInjector(RMContext.class,