You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-issues@hadoop.apache.org by "Xuan Gong (JIRA)" <ji...@apache.org> on 2013/03/26 19:05:16 UTC
[jira] [Assigned] (YARN-101) If the heartbeat message loss, the
nodestatus info of complete container will loss too.
[ https://issues.apache.org/jira/browse/YARN-101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xuan Gong reassigned YARN-101:
------------------------------
Assignee: Xuan Gong
> If the heartbeat message loss, the nodestatus info of complete container will loss too.
> ----------------------------------------------------------------------------------------
>
> Key: YARN-101
> URL: https://issues.apache.org/jira/browse/YARN-101
> Project: Hadoop YARN
> Issue Type: Bug
> Components: nodemanager
> Environment: suse.
> Reporter: xieguiming
> Assignee: Xuan Gong
> Priority: Minor
> Attachments: YARN-101.1.patch
>
>
> see the red color:
> org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.java
> protected void startStatusUpdater() {
> new Thread("Node Status Updater") {
> @Override
> @SuppressWarnings("unchecked")
> public void run() {
> int lastHeartBeatID = 0;
> while (!isStopped) {
> // Send heartbeat
> try {
> synchronized (heartbeatMonitor) {
> heartbeatMonitor.wait(heartBeatInterval);
> }
> {color:red}
> // Before we send the heartbeat, we get the NodeStatus,
> // whose method removes completed containers.
> NodeStatus nodeStatus = getNodeStatus();
> {color}
> nodeStatus.setResponseId(lastHeartBeatID);
>
> NodeHeartbeatRequest request = recordFactory
> .newRecordInstance(NodeHeartbeatRequest.class);
> request.setNodeStatus(nodeStatus);
> {color:red}
> // But if the nodeHeartbeat fails, we've already removed the containers away to know about it. We aren't handling a nodeHeartbeat failure case here.
> HeartbeatResponse response =
> resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
> {color}
> if (response.getNodeAction() == NodeAction.SHUTDOWN) {
> LOG
> .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
> " hence shutting down.");
> NodeStatusUpdaterImpl.this.stop();
> break;
> }
> if (response.getNodeAction() == NodeAction.REBOOT) {
> LOG.info("Node is out of sync with ResourceManager,"
> + " hence rebooting.");
> NodeStatusUpdaterImpl.this.reboot();
> break;
> }
> lastHeartBeatID = response.getResponseId();
> List<ContainerId> containersToCleanup = response
> .getContainersToCleanupList();
> if (containersToCleanup.size() != 0) {
> dispatcher.getEventHandler().handle(
> new CMgrCompletedContainersEvent(containersToCleanup));
> }
> List<ApplicationId> appsToCleanup =
> response.getApplicationsToCleanupList();
> //Only start tracking for keepAlive on FINISH_APP
> trackAppsForKeepAlive(appsToCleanup);
> if (appsToCleanup.size() != 0) {
> dispatcher.getEventHandler().handle(
> new CMgrCompletedAppsEvent(appsToCleanup));
> }
> } catch (Throwable e) {
> // TODO Better error handling. Thread can die with the rest of the
> // NM still running.
> LOG.error("Caught exception in status-updater", e);
> }
> }
> }
> }.start();
> }
> private NodeStatus getNodeStatus() {
> NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
> nodeStatus.setNodeId(this.nodeId);
> int numActiveContainers = 0;
> List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
> for (Iterator<Entry<ContainerId, Container>> i =
> this.context.getContainers().entrySet().iterator(); i.hasNext();) {
> Entry<ContainerId, Container> e = i.next();
> ContainerId containerId = e.getKey();
> Container container = e.getValue();
> // Clone the container to send it to the RM
> org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
> container.cloneAndGetContainerStatus();
> containersStatuses.add(containerStatus);
> ++numActiveContainers;
> LOG.info("Sending out status for container: " + containerStatus);
> {color:red}
> // Here is the part that removes the completed containers.
> if (containerStatus.getState() == ContainerState.COMPLETE) {
> // Remove
> i.remove();
> {color}
> LOG.info("Removed completed container " + containerId);
> }
> }
> nodeStatus.setContainersStatuses(containersStatuses);
> LOG.debug(this.nodeId + " sending out status for "
> + numActiveContainers + " containers");
> NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
> nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
> nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
> nodeHealthStatus.setLastHealthReportTime(
> healthChecker.getLastHealthReportTime());
> if (LOG.isDebugEnabled()) {
> LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
> + ", " + nodeHealthStatus.getHealthReport());
> }
> nodeStatus.setNodeHealthStatus(nodeHealthStatus);
> List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
> nodeStatus.setKeepAliveApplications(keepAliveAppIds);
>
> return nodeStatus;
> }
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira