You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/01/30 19:28:50 UTC

[11/37] hadoop git commit: YARN-7102. NM heartbeat stuck when responseId overflows MAX_INT. Contributed by Botong Huang

YARN-7102. NM heartbeat stuck when responseId overflows MAX_INT. Contributed by Botong Huang


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ff8378eb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ff8378eb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ff8378eb

Branch: refs/heads/HDFS-7240
Commit: ff8378eb1b960c72d18a984c7e5d145b407ca11a
Parents: 16be42d
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Jan 25 17:47:19 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Jan 25 17:47:19 2018 -0600

----------------------------------------------------------------------
 .../yarn/sls/nodemanager/NMSimulator.java       |  4 +-
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   | 11 +---
 .../yarn/sls/scheduler/RMNodeWrapper.java       | 12 +---
 .../resourcemanager/ResourceTrackerService.java | 67 ++++++++++++++------
 .../server/resourcemanager/rmnode/RMNode.java   | 13 ++--
 .../resourcemanager/rmnode/RMNodeImpl.java      | 47 ++++----------
 .../rmnode/RMNodeStatusEvent.java               | 13 +---
 .../yarn/server/resourcemanager/MockNM.java     | 18 ++++--
 .../yarn/server/resourcemanager/MockNodes.java  |  9 +--
 .../resourcemanager/TestRMNodeTransitions.java  | 14 +---
 .../TestResourceTrackerService.java             | 35 ++++++++--
 .../TestRMAppLogAggregationStatus.java          | 10 +--
 12 files changed, 125 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
index 6b19128..ba0fd56 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
@@ -71,7 +71,7 @@ public class NMSimulator extends TaskRunner.Task {
   // resource manager
   private ResourceManager rm;
   // heart beat response id
-  private int RESPONSE_ID = 1;
+  private int responseId = 0;
   private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class);
   
   public void init(String nodeIdStr, Resource nodeResource,
@@ -131,7 +131,7 @@ public class NMSimulator extends TaskRunner.Task {
     ns.setContainersStatuses(generateContainerStatusList());
     ns.setNodeId(node.getNodeID());
     ns.setKeepAliveApplications(new ArrayList<ApplicationId>());
-    ns.setResponseId(RESPONSE_ID ++);
+    ns.setResponseId(responseId++);
     ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0));
     beatRequest.setNodeStatus(ns);
     NodeHeartbeatResponse beatResponse =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index e71ddff..1016ce1 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -144,8 +144,8 @@ public class NodeInfo {
       return runningApplications;
     }
 
-    public void updateNodeHeartbeatResponseForCleanup(
-            NodeHeartbeatResponse response) {
+    public void setAndUpdateNodeHeartbeatResponse(
+        NodeHeartbeatResponse response) {
     }
 
     public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
@@ -179,13 +179,6 @@ public class NodeInfo {
     }
 
     @Override
-    public void updateNodeHeartbeatResponseForUpdatedContainers(
-        NodeHeartbeatResponse response) {
-      // TODO Auto-generated method stub
-      
-    }
-
-    @Override
     public List<Container> pullNewlyIncreasedContainers() {
       // TODO Auto-generated method stub
       return null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 6b7ac3c..fdad826 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -127,9 +127,9 @@ public class RMNodeWrapper implements RMNode {
   }
 
   @Override
-  public void updateNodeHeartbeatResponseForCleanup(
-          NodeHeartbeatResponse nodeHeartbeatResponse) {
-    node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse);
+  public void setAndUpdateNodeHeartbeatResponse(
+      NodeHeartbeatResponse nodeHeartbeatResponse) {
+    node.setAndUpdateNodeHeartbeatResponse(nodeHeartbeatResponse);
   }
 
   @Override
@@ -167,12 +167,6 @@ public class RMNodeWrapper implements RMNode {
     return RMNodeLabelsManager.EMPTY_STRING_SET;
   }
 
-  @Override
-  public void updateNodeHeartbeatResponseForUpdatedContainers(
-      NodeHeartbeatResponse response) {
-    // TODO Auto-generated method stub
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public List<Container> pullNewlyIncreasedContainers() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index a42d053..9d95f63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -81,6 +82,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@@ -403,14 +405,37 @@ public class ResourceTrackerService extends AbstractService implements
     } else {
       LOG.info("Reconnect from the node at: " + host);
       this.nmLivelinessMonitor.unregister(nodeId);
-      // Reset heartbeat ID since node just restarted.
-      oldNode.resetLastNodeHeartBeatResponse();
-      this.rmContext
-          .getDispatcher()
-          .getEventHandler()
-          .handle(
-              new RMNodeReconnectEvent(nodeId, rmNode, request
-                  .getRunningApplications(), request.getNMContainerStatuses()));
+
+      if (CollectionUtils.isEmpty(request.getRunningApplications())
+          && rmNode.getState() != NodeState.DECOMMISSIONING
+          && rmNode.getHttpPort() != oldNode.getHttpPort()) {
+        // Reconnected node differs, so replace old node and start new node
+        switch (rmNode.getState()) {
+        case RUNNING:
+          ClusterMetrics.getMetrics().decrNumActiveNodes();
+          break;
+        case UNHEALTHY:
+          ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
+          break;
+        default:
+          LOG.debug("Unexpected Rmnode state");
+        }
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new NodeRemovedSchedulerEvent(rmNode));
+
+        this.rmContext.getRMNodes().put(nodeId, rmNode);
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new RMNodeStartedEvent(nodeId, null, null));
+
+      } else {
+        // Reset heartbeat ID since node just restarted.
+        oldNode.resetLastNodeHeartBeatResponse();
+
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new RMNodeReconnectEvent(nodeId, rmNode,
+                request.getRunningApplications(),
+                request.getNMContainerStatuses()));
+      }
     }
     // On every node manager register we will be clearing NMToken keys if
     // present for any running application.
@@ -508,12 +533,13 @@ public class ResourceTrackerService extends AbstractService implements
 
     // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
     NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
-    if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
-        .getResponseId()) {
+    if (getNextResponseId(
+        remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse
+            .getResponseId()) {
       LOG.info("Received duplicate heartbeat from node "
           + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId());
       return lastNodeHeartbeatResponse;
-    } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
+    } else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse
         .getResponseId()) {
       String message =
           "Too far behind rm response id:"
@@ -549,13 +575,11 @@ public class ResourceTrackerService extends AbstractService implements
     }
 
     // Heartbeat response
-    NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
-        .newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
-            getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
-            nextHeartBeatInterval);
-    rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
-    rmNode.updateNodeHeartbeatResponseForUpdatedContainers(
-        nodeHeartBeatResponse);
+    NodeHeartbeatResponse nodeHeartBeatResponse =
+        YarnServerBuilderUtils.newNodeHeartbeatResponse(
+            getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
+            NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
+    rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
 
     populateKeys(request, nodeHeartBeatResponse);
 
@@ -573,7 +597,7 @@ public class ResourceTrackerService extends AbstractService implements
 
     // 4. Send status to RMNode, saving the latest response.
     RMNodeStatusEvent nodeStatusEvent =
-        new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse);
+        new RMNodeStatusEvent(nodeId, remoteNodeStatus);
     if (request.getLogAggregationReportsForApps() != null
         && !request.getLogAggregationReportsForApps().isEmpty()) {
       nodeStatusEvent.setLogAggregationReportsForApps(request
@@ -614,6 +638,11 @@ public class ResourceTrackerService extends AbstractService implements
     return nodeHeartBeatResponse;
   }
 
+  private int getNextResponseId(int responseId) {
+    // Loop between 0 and Integer.MAX_VALUE
+    return (responseId + 1) & Integer.MAX_VALUE;
+  }
+
   private void setAppCollectorsMapToResponse(
       List<ApplicationId> runningApps, NodeHeartbeatResponse response) {
     Map<ApplicationId, AppCollectorData> liveAppCollectorsMap = new

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 328c040..a5615ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -141,10 +141,11 @@ public interface RMNode {
 
   /**
    * Update a {@link NodeHeartbeatResponse} with the list of containers and
-   * applications to clean up for this node.
+   * applications to clean up for this node, and the containers to be updated.
+   *
    * @param response the {@link NodeHeartbeatResponse} to update
    */
-  void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
+  void setAndUpdateNodeHeartbeatResponse(NodeHeartbeatResponse response);
 
   public NodeHeartbeatResponse getLastNodeHeartBeatResponse();
 
@@ -167,13 +168,7 @@ public interface RMNode {
    * @return labels in this node
    */
   public Set<String> getNodeLabels();
-  
-  /**
-   * Update containers to be updated
-   */
-  void updateNodeHeartbeatResponseForUpdatedContainers(
-      NodeHeartbeatResponse response);
-  
+
   public List<Container> pullNewlyIncreasedContainers();
 
   OpportunisticContainersStatus getOpportunisticContainersStatus();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 2b013a0..da54eb9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -598,7 +598,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   };
 
   @Override
-  public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
+  public void setAndUpdateNodeHeartbeatResponse(
+      NodeHeartbeatResponse response) {
     this.writeLock.lock();
 
     try {
@@ -613,38 +614,30 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       this.finishedApplications.clear();
       this.containersToSignal.clear();
       this.containersToBeRemovedFromNM.clear();
-    } finally {
-      this.writeLock.unlock();
-    }
-  };
-  
-  @VisibleForTesting
-  public Collection<Container> getToBeUpdatedContainers() {
-    return toBeUpdatedContainers.values();
-  }
-  
-  @Override
-  public void updateNodeHeartbeatResponseForUpdatedContainers(
-      NodeHeartbeatResponse response) {
-    this.writeLock.lock();
-    
-    try {
+
       response.addAllContainersToUpdate(toBeUpdatedContainers.values());
       toBeUpdatedContainers.clear();
 
       // NOTE: This is required for backward compatibility.
       response.addAllContainersToDecrease(toBeDecreasedContainers.values());
       toBeDecreasedContainers.clear();
+
+      // Synchronously update the last response in rmNode with updated
+      // responseId
+      this.latestNodeHeartBeatResponse = response;
     } finally {
       this.writeLock.unlock();
     }
+  };
+
+  @VisibleForTesting
+  public Collection<Container> getToBeUpdatedContainers() {
+    return toBeUpdatedContainers.values();
   }
 
   @Override
   public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
-
     this.readLock.lock();
-
     try {
       return this.latestNodeHeartBeatResponse;
     } finally {
@@ -818,7 +811,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private static NodeHealthStatus updateRMNodeFromStatusEvents(
       RMNodeImpl rmNode, RMNodeStatusEvent statusEvent) {
     // Switch the last heartbeatresponse.
-    rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
     NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
     rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
     rmNode.setLastHealthReportTime(remoteNodeHealthStatus
@@ -912,21 +904,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
             rmNode.context.getDispatcher().getEventHandler().handle(
                 new NodeAddedSchedulerEvent(rmNode));
           }
-        } else {
-          // Reconnected node differs, so replace old node and start new node
-          switch (rmNode.getState()) {
-            case RUNNING:
-              ClusterMetrics.getMetrics().decrNumActiveNodes();
-              break;
-            case UNHEALTHY:
-              ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
-              break;
-            default:
-              LOG.debug("Unexpected Rmnode state");
-            }
-            rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
-            rmNode.context.getDispatcher().getEventHandler().handle(
-                new RMNodeStartedEvent(newNode.getNodeID(), null, null));
         }
 
       } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index f9fe159..c79f270 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -35,20 +34,16 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 public class RMNodeStatusEvent extends RMNodeEvent {
 
   private final NodeStatus nodeStatus;
-  private final NodeHeartbeatResponse latestResponse;
   private List<LogAggregationReport> logAggregationReportsForApps;
 
-  public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus,
-      NodeHeartbeatResponse latestResponse) {
-    this(nodeId, nodeStatus, latestResponse, null);
+  public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus) {
+    this(nodeId, nodeStatus, null);
   }
 
   public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus,
-      NodeHeartbeatResponse latestResponse,
       List<LogAggregationReport> logAggregationReportsForApps) {
     super(nodeId, RMNodeEventType.STATUS_UPDATE);
     this.nodeStatus = nodeStatus;
-    this.latestResponse = latestResponse;
     this.logAggregationReportsForApps = logAggregationReportsForApps;
   }
 
@@ -60,10 +55,6 @@ public class RMNodeStatusEvent extends RMNodeEvent {
     return this.nodeStatus.getContainersStatuses();
   }
 
-  public NodeHeartbeatResponse getLatestResponse() {
-    return this.latestResponse;
-  }
-  
   public List<ApplicationId> getKeepAliveAppIds() {
     return this.nodeStatus.getKeepAliveApplications();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 05b51e3..0a06e82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -131,7 +131,7 @@ public class MockNM {
             container.getResource());
     List<Container> increasedConts = Collections.singletonList(container);
     nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts,
-        true, ++responseId);
+        true, responseId);
   }
 
   public void addRegisteringCollector(ApplicationId appId,
@@ -190,12 +190,13 @@ public class MockNM {
         }
       }
     }
+    responseId = 0;
     return registrationResponse;
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
     return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
-        Collections.<Container>emptyList(), isHealthy, ++responseId);
+        Collections.<Container>emptyList(), isHealthy, responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
@@ -208,12 +209,12 @@ public class MockNM {
     containerStatusList.add(containerStatus);
     Log.getLog().info("ContainerStatus: " + containerStatus);
     return nodeHeartbeat(containerStatusList,
-        Collections.<Container>emptyList(), true, ++responseId);
+        Collections.<Container>emptyList(), true, responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
       List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
-    return nodeHeartbeat(conts, isHealthy, ++responseId);
+    return nodeHeartbeat(conts, isHealthy, responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
@@ -229,7 +230,7 @@ public class MockNM {
   public NodeHeartbeatResponse nodeHeartbeat(
       List<ContainerStatus> updatedStats, boolean isHealthy) throws Exception {
     return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
-        isHealthy, ++responseId);
+        isHealthy, responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
@@ -265,7 +266,8 @@ public class MockNM {
 
     NodeHeartbeatResponse heartbeatResponse =
         resourceTracker.nodeHeartbeat(req);
-    
+    responseId = heartbeatResponse.getResponseId();
+
     MasterKey masterKeyFromRM = heartbeatResponse.getContainerTokenMasterKey();
     if (masterKeyFromRM != null
         && masterKeyFromRM.getKeyId() != this.currentContainerTokenMasterKey
@@ -303,4 +305,8 @@ public class MockNM {
   public String getVersion() {
     return version;
   }
+
+  public void setResponseId(int id) {
+    this.responseId = id;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 317c648..d6549b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -205,7 +205,8 @@ public class MockNodes {
     }
 
     @Override
-    public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
+    public void setAndUpdateNodeHeartbeatResponse(
+        NodeHeartbeatResponse response) {
     }
 
     @Override
@@ -246,12 +247,6 @@ public class MockNodes {
     }
 
     @Override
-    public void updateNodeHeartbeatResponseForUpdatedContainers(
-        NodeHeartbeatResponse response) {
-      
-    }
-
-    @Override
     public List<Container> pullNewlyIncreasedContainers() {
       return Collections.emptyList();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 3657123..487d226 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -164,15 +164,12 @@ public class TestRMNodeTransitions {
   
   private RMNodeStatusEvent getMockRMNodeStatusEvent(
       List<ContainerStatus> containerStatus) {
-    NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
-
     NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
     Boolean yes = new Boolean(true);
     doReturn(yes).when(healthStatus).getIsNodeHealthy();
     
     RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
     doReturn(healthStatus).when(event).getNodeHealthStatus();
-    doReturn(response).when(event).getLatestResponse();
     doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
     if (containerStatus != null) {
       doReturn(containerStatus).when(event).getContainers();
@@ -181,15 +178,12 @@ public class TestRMNodeTransitions {
   }
   
   private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() {
-    NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
-
     NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
     Boolean yes = new Boolean(true);
     doReturn(yes).when(healthStatus).getIsNodeHealthy();
 
     RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
     doReturn(healthStatus).when(event).getNodeHealthStatus();
-    doReturn(response).when(event).getLatestResponse();
     doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
     doReturn(getAppIdList()).when(event).getKeepAliveAppIds();
     return event;
@@ -202,15 +196,12 @@ public class TestRMNodeTransitions {
   }
 
   private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() {
-    NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
-
     NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
     Boolean yes = new Boolean(true);
     doReturn(yes).when(healthStatus).getIsNodeHealthy();
 
     RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
     doReturn(healthStatus).when(event).getNodeHealthStatus();
-    doReturn(response).when(event).getLatestResponse();
     doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
     doReturn(null).when(event).getKeepAliveAppIds();
     return event;
@@ -646,7 +637,7 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(1, node.getContainersToCleanUp().size());
     Assert.assertEquals(1, node.getAppsToCleanup().size());
     NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class);
-    node.updateNodeHeartbeatResponseForCleanup(hbrsp);
+    node.setAndUpdateNodeHeartbeatResponse(hbrsp);
     Assert.assertEquals(0, node.getContainersToCleanUp().size());
     Assert.assertEquals(0, node.getAppsToCleanup().size());
     Assert.assertEquals(1, hbrsp.getContainersToCleanup().size());
@@ -1108,7 +1099,8 @@ public class TestRMNodeTransitions {
 
     NodeHeartbeatResponse hbrsp =
         Records.newRecord(NodeHeartbeatResponse.class);
-    node.updateNodeHeartbeatResponseForCleanup(hbrsp);
+    node.setAndUpdateNodeHeartbeatResponse(hbrsp);
+
     Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size());
     Assert.assertEquals(0, node.getCompletedContainers().size());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index fc6326e..96e4451 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -801,7 +801,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
         Records.newRecord(NodeHeartbeatRequest.class);
     heartbeatReq.setNodeLabels(null); // Node heartbeat label update
     nodeStatusObject = getNodeStatusObject(nodeId);
-    nodeStatusObject.setResponseId(responseId+2);
+    nodeStatusObject.setResponseId(responseId+1);
     heartbeatReq.setNodeStatus(nodeStatusObject);
     heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
         .getNMTokenMasterKey());
@@ -1128,8 +1128,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
         "", System.currentTimeMillis());
     NodeStatus nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0,
         statusList, null, nodeHealth, null, null, null);
-    node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus,
-        nodeHeartbeat1));
+    node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
 
     Assert.assertEquals(1, node1.getRunningApps().size());
     Assert.assertEquals(app1.getApplicationId(), node1.getRunningApps().get(0));
@@ -1145,8 +1144,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     statusList.add(status2);
     nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0,
         statusList, null, nodeHealth, null, null, null);
-    node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeStatus,
-        nodeHeartbeat2));
+    node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeStatus));
     Assert.assertEquals(1, node2.getRunningApps().size());
     Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0));
 
@@ -2290,4 +2288,31 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
       }
     }
   }
+
+  @Test
+  public void testResponseIdOverflow() throws Exception {
+    Configuration conf = new Configuration();
+    rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+
+    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
+
+    // prepare the responseId that's about to overflow
+    RMNode node = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+    node.getLastNodeHeartBeatResponse().setResponseId(Integer.MAX_VALUE);
+
+    nm1.setResponseId(Integer.MAX_VALUE);
+
+    // heartbeat twice and check responseId
+    nodeHeartbeat = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
+    Assert.assertEquals(0, nodeHeartbeat.getResponseId());
+
+    nodeHeartbeat = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
+    Assert.assertEquals(1, nodeHeartbeat.getResponseId());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 677990b..c2bc611 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -172,7 +172,7 @@ public class TestRMAppLogAggregationStatus {
     NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0,
         new ArrayList<ContainerStatus>(), null,
         NodeHealthStatus.newInstance(true, null, 0), null, null, null);
-    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
         node1ReportForApp));
 
     List<LogAggregationReport> node2ReportForApp =
@@ -186,7 +186,7 @@ public class TestRMAppLogAggregationStatus {
     NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0,
         new ArrayList<ContainerStatus>(), null,
         NodeHealthStatus.newInstance(true, null, 0), null, null, null);
-    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null,
+    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2,
         node2ReportForApp));
     // node1 and node2 has updated its log aggregation status
     // verify that the log aggregation status for node1, node2
@@ -223,7 +223,7 @@ public class TestRMAppLogAggregationStatus {
         LogAggregationReport.newInstance(appId,
           LogAggregationStatus.RUNNING, messageForNode1_2);
     node1ReportForApp2.add(report1_2);
-    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
         node1ReportForApp2));
 
     // verify that the log aggregation status for node1
@@ -291,7 +291,7 @@ public class TestRMAppLogAggregationStatus {
       LogAggregationStatus.SUCCEEDED, ""));
     // For every logAggregationReport cached in memory, we can only save at most
     // 10 diagnostic messages/failure messages
-    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
         node1ReportForApp3));
 
     logAggregationStatus = rmApp.getLogAggregationReportsForApp();
@@ -335,7 +335,7 @@ public class TestRMAppLogAggregationStatus {
           LogAggregationStatus.FAILED, "");
     node2ReportForApp2.add(report2_2);
     node2ReportForApp2.add(report2_3);
-    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null,
+    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2,
         node2ReportForApp2));
     Assert.assertEquals(LogAggregationStatus.FAILED,
       rmApp.getLogAggregationStatusForAppReport());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org