You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-dev@hadoop.apache.org by "xieguiming (Created) (JIRA)" <ji...@apache.org> on 2012/03/30 07:27:23 UTC
[jira] [Created] (MAPREDUCE-4086) If the heartbeat message loss,
the nodestatus info of complete container will loss too.
If the heartbeat message loss, the nodestatus info of complete container will loss too.
----------------------------------------------------------------------------------------
Key: MAPREDUCE-4086
URL: https://issues.apache.org/jira/browse/MAPREDUCE-4086
Project: Hadoop Map/Reduce
Issue Type: Bug
Components: nodemanager
Affects Versions: 0.23.1
Environment: suse.
Reporter: xieguiming
Priority: Minor
see the red color:
org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.java
protected void startStatusUpdater() {
new Thread("Node Status Updater") {
@Override
@SuppressWarnings("unchecked")
public void run() {
int lastHeartBeatID = 0;
while (!isStopped) {
// Send heartbeat
try {
synchronized (heartbeatMonitor) {
heartbeatMonitor.wait(heartBeatInterval);
}
{color:red}
// before send the heartbeat, and will get the node status,
// and will remove the complete container
NodeStatus nodeStatus = getNodeStatus();
{color}
nodeStatus.setResponseId(lastHeartBeatID);
NodeHeartbeatRequest request = recordFactory
.newRecordInstance(NodeHeartbeatRequest.class);
request.setNodeStatus(nodeStatus);
{color:red}
// if the nodeHeartbeat failed, but the complete container remove
HeartbeatResponse response =
resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
{color}
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
LOG
.info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
" hence shutting down.");
NodeStatusUpdaterImpl.this.stop();
break;
}
if (response.getNodeAction() == NodeAction.REBOOT) {
LOG.info("Node is out of sync with ResourceManager,"
+ " hence rebooting.");
NodeStatusUpdaterImpl.this.reboot();
break;
}
lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanupList();
if (containersToCleanup.size() != 0) {
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containersToCleanup));
}
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanupList();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (appsToCleanup.size() != 0) {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup));
}
} catch (Throwable e) {
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
}
}
}
}.start();
}
private NodeStatus getNodeStatus() {
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
nodeStatus.setNodeId(this.nodeId);
int numActiveContainers = 0;
List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next();
ContainerId containerId = e.getKey();
Container container = e.getValue();
// Clone the container to send it to the RM
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus();
containersStatuses.add(containerStatus);
++numActiveContainers;
LOG.info("Sending out status for container: " + containerStatus);
{color:red}
//if the nodeHeartbeat fail, and the complete container will remove.
if (containerStatus.getState() == ContainerState.COMPLETE) {
// Remove
i.remove();
{color}
LOG.info("Removed completed container " + containerId);
}
}
nodeStatus.setContainersStatuses(containersStatuses);
LOG.debug(this.nodeId + " sending out status for "
+ numActiveContainers + " containers");
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
nodeHealthStatus.setLastHealthReportTime(
healthChecker.getLastHealthReportTime());
if (LOG.isDebugEnabled()) {
LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+ ", " + nodeHealthStatus.getHealthReport());
}
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
nodeStatus.setKeepAliveApplications(keepAliveAppIds);
return nodeStatus;
}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira