You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-dev@hadoop.apache.org by "xieguiming (Created) (JIRA)" <ji...@apache.org> on 2012/03/30 07:27:23 UTC

[jira] [Created] (MAPREDUCE-4086) If the heartbeat message loss, the nodestatus info of complete container will loss too.

If  the heartbeat message loss, the nodestatus info of complete container will loss too.
----------------------------------------------------------------------------------------

                 Key: MAPREDUCE-4086
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-4086
             Project: Hadoop Map/Reduce
          Issue Type: Bug
          Components: nodemanager
    Affects Versions: 0.23.1
         Environment: suse.
            Reporter: xieguiming
            Priority: Minor


see the red color:

org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.java

 protected void startStatusUpdater() {

    new Thread("Node Status Updater") {
      @Override
      @SuppressWarnings("unchecked")
      public void run() {
        int lastHeartBeatID = 0;
        while (!isStopped) {
          // Send heartbeat
          try {
            synchronized (heartbeatMonitor) {
              heartbeatMonitor.wait(heartBeatInterval);
            }
        {color:red} 
            // before send the heartbeat, and will get the node status,
            // and will remove the complete container
            NodeStatus nodeStatus = getNodeStatus();
         {color}
            nodeStatus.setResponseId(lastHeartBeatID);
            
            NodeHeartbeatRequest request = recordFactory
                .newRecordInstance(NodeHeartbeatRequest.class);
            request.setNodeStatus(nodeStatus);   
            {color:red} 

           // if the nodeHeartbeat failed, but the complete container remove
            HeartbeatResponse response =
              resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
           {color} 

            if (response.getNodeAction() == NodeAction.SHUTDOWN) {
              LOG
                  .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
                  		" hence shutting down.");
              NodeStatusUpdaterImpl.this.stop();
              break;
            }
            if (response.getNodeAction() == NodeAction.REBOOT) {
              LOG.info("Node is out of sync with ResourceManager,"
                  + " hence rebooting.");
              NodeStatusUpdaterImpl.this.reboot();
              break;
            }

            lastHeartBeatID = response.getResponseId();
            List<ContainerId> containersToCleanup = response
                .getContainersToCleanupList();
            if (containersToCleanup.size() != 0) {
              dispatcher.getEventHandler().handle(
                  new CMgrCompletedContainersEvent(containersToCleanup));
            }
            List<ApplicationId> appsToCleanup =
                response.getApplicationsToCleanupList();
            //Only start tracking for keepAlive on FINISH_APP
            trackAppsForKeepAlive(appsToCleanup);
            if (appsToCleanup.size() != 0) {
              dispatcher.getEventHandler().handle(
                  new CMgrCompletedAppsEvent(appsToCleanup));
            }
          } catch (Throwable e) {
            // TODO Better error handling. Thread can die with the rest of the
            // NM still running.
            LOG.error("Caught exception in status-updater", e);
          }
        }
      }
    }.start();
  }



  private NodeStatus getNodeStatus() {

    NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
    nodeStatus.setNodeId(this.nodeId);

    int numActiveContainers = 0;
    List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
    for (Iterator<Entry<ContainerId, Container>> i =
        this.context.getContainers().entrySet().iterator(); i.hasNext();) {
      Entry<ContainerId, Container> e = i.next();
      ContainerId containerId = e.getKey();
      Container container = e.getValue();

      // Clone the container to send it to the RM
      org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = 
          container.cloneAndGetContainerStatus();
      containersStatuses.add(containerStatus);
      ++numActiveContainers;
      LOG.info("Sending out status for container: " + containerStatus);
      {color:red} 

      //if the nodeHeartbeat fail, and the complete container will remove.
      if (containerStatus.getState() == ContainerState.COMPLETE) {
        // Remove
        i.remove();
      {color} 

        LOG.info("Removed completed container " + containerId);
      }
    }
    nodeStatus.setContainersStatuses(containersStatuses);

    LOG.debug(this.nodeId + " sending out status for "
        + numActiveContainers + " containers");

    NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
    nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
    nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
    nodeHealthStatus.setLastHealthReportTime(
        healthChecker.getLastHealthReportTime());
    if (LOG.isDebugEnabled()) {
      LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
                + ", " + nodeHealthStatus.getHealthReport());
    }
    nodeStatus.setNodeHealthStatus(nodeHealthStatus);

    List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
    nodeStatus.setKeepAliveApplications(keepAliveAppIds);
    
    return nodeStatus;
  }


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira