You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/01/30 19:28:50 UTC
[11/37] hadoop git commit: YARN-7102. NM heartbeat stuck when
responseId overflows MAX_INT. Contributed by Botong Huang
YARN-7102. NM heartbeat stuck when responseId overflows MAX_INT. Contributed by Botong Huang
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ff8378eb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ff8378eb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ff8378eb
Branch: refs/heads/HDFS-7240
Commit: ff8378eb1b960c72d18a984c7e5d145b407ca11a
Parents: 16be42d
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Jan 25 17:47:19 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Jan 25 17:47:19 2018 -0600
----------------------------------------------------------------------
.../yarn/sls/nodemanager/NMSimulator.java | 4 +-
.../hadoop/yarn/sls/nodemanager/NodeInfo.java | 11 +---
.../yarn/sls/scheduler/RMNodeWrapper.java | 12 +---
.../resourcemanager/ResourceTrackerService.java | 67 ++++++++++++++------
.../server/resourcemanager/rmnode/RMNode.java | 13 ++--
.../resourcemanager/rmnode/RMNodeImpl.java | 47 ++++----------
.../rmnode/RMNodeStatusEvent.java | 13 +---
.../yarn/server/resourcemanager/MockNM.java | 18 ++++--
.../yarn/server/resourcemanager/MockNodes.java | 9 +--
.../resourcemanager/TestRMNodeTransitions.java | 14 +---
.../TestResourceTrackerService.java | 35 ++++++++--
.../TestRMAppLogAggregationStatus.java | 10 +--
12 files changed, 125 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
index 6b19128..ba0fd56 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
@@ -71,7 +71,7 @@ public class NMSimulator extends TaskRunner.Task {
// resource manager
private ResourceManager rm;
// heart beat response id
- private int RESPONSE_ID = 1;
+ private int responseId = 0;
private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class);
public void init(String nodeIdStr, Resource nodeResource,
@@ -131,7 +131,7 @@ public class NMSimulator extends TaskRunner.Task {
ns.setContainersStatuses(generateContainerStatusList());
ns.setNodeId(node.getNodeID());
ns.setKeepAliveApplications(new ArrayList<ApplicationId>());
- ns.setResponseId(RESPONSE_ID ++);
+ ns.setResponseId(responseId++);
ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0));
beatRequest.setNodeStatus(ns);
NodeHeartbeatResponse beatResponse =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index e71ddff..1016ce1 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -144,8 +144,8 @@ public class NodeInfo {
return runningApplications;
}
- public void updateNodeHeartbeatResponseForCleanup(
- NodeHeartbeatResponse response) {
+ public void setAndUpdateNodeHeartbeatResponse(
+ NodeHeartbeatResponse response) {
}
public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
@@ -179,13 +179,6 @@ public class NodeInfo {
}
@Override
- public void updateNodeHeartbeatResponseForUpdatedContainers(
- NodeHeartbeatResponse response) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
public List<Container> pullNewlyIncreasedContainers() {
// TODO Auto-generated method stub
return null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 6b7ac3c..fdad826 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -127,9 +127,9 @@ public class RMNodeWrapper implements RMNode {
}
@Override
- public void updateNodeHeartbeatResponseForCleanup(
- NodeHeartbeatResponse nodeHeartbeatResponse) {
- node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse);
+ public void setAndUpdateNodeHeartbeatResponse(
+ NodeHeartbeatResponse nodeHeartbeatResponse) {
+ node.setAndUpdateNodeHeartbeatResponse(nodeHeartbeatResponse);
}
@Override
@@ -167,12 +167,6 @@ public class RMNodeWrapper implements RMNode {
return RMNodeLabelsManager.EMPTY_STRING_SET;
}
- @Override
- public void updateNodeHeartbeatResponseForUpdatedContainers(
- NodeHeartbeatResponse response) {
- // TODO Auto-generated method stub
- }
-
@SuppressWarnings("unchecked")
@Override
public List<Container> pullNewlyIncreasedContainers() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index a42d053..9d95f63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -81,6 +82,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@@ -403,14 +405,37 @@ public class ResourceTrackerService extends AbstractService implements
} else {
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
- // Reset heartbeat ID since node just restarted.
- oldNode.resetLastNodeHeartBeatResponse();
- this.rmContext
- .getDispatcher()
- .getEventHandler()
- .handle(
- new RMNodeReconnectEvent(nodeId, rmNode, request
- .getRunningApplications(), request.getNMContainerStatuses()));
+
+ if (CollectionUtils.isEmpty(request.getRunningApplications())
+ && rmNode.getState() != NodeState.DECOMMISSIONING
+ && rmNode.getHttpPort() != oldNode.getHttpPort()) {
+ // Reconnected node differs, so replace old node and start new node
+ switch (rmNode.getState()) {
+ case RUNNING:
+ ClusterMetrics.getMetrics().decrNumActiveNodes();
+ break;
+ case UNHEALTHY:
+ ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
+ break;
+ default:
+ LOG.debug("Unexpected Rmnode state");
+ }
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new NodeRemovedSchedulerEvent(rmNode));
+
+ this.rmContext.getRMNodes().put(nodeId, rmNode);
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeStartedEvent(nodeId, null, null));
+
+ } else {
+ // Reset heartbeat ID since node just restarted.
+ oldNode.resetLastNodeHeartBeatResponse();
+
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeReconnectEvent(nodeId, rmNode,
+ request.getRunningApplications(),
+ request.getNMContainerStatuses()));
+ }
}
// On every node manager register we will be clearing NMToken keys if
// present for any running application.
@@ -508,12 +533,13 @@ public class ResourceTrackerService extends AbstractService implements
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
- if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
- .getResponseId()) {
+ if (getNextResponseId(
+ remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse
+ .getResponseId()) {
LOG.info("Received duplicate heartbeat from node "
+ rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId());
return lastNodeHeartbeatResponse;
- } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
+ } else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse
.getResponseId()) {
String message =
"Too far behind rm response id:"
@@ -549,13 +575,11 @@ public class ResourceTrackerService extends AbstractService implements
}
// Heartbeat response
- NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
- .newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
- getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
- nextHeartBeatInterval);
- rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
- rmNode.updateNodeHeartbeatResponseForUpdatedContainers(
- nodeHeartBeatResponse);
+ NodeHeartbeatResponse nodeHeartBeatResponse =
+ YarnServerBuilderUtils.newNodeHeartbeatResponse(
+ getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
+ NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
+ rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse);
@@ -573,7 +597,7 @@ public class ResourceTrackerService extends AbstractService implements
// 4. Send status to RMNode, saving the latest response.
RMNodeStatusEvent nodeStatusEvent =
- new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse);
+ new RMNodeStatusEvent(nodeId, remoteNodeStatus);
if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) {
nodeStatusEvent.setLogAggregationReportsForApps(request
@@ -614,6 +638,11 @@ public class ResourceTrackerService extends AbstractService implements
return nodeHeartBeatResponse;
}
+ private int getNextResponseId(int responseId) {
+ // Loop between 0 and Integer.MAX_VALUE
+ return (responseId + 1) & Integer.MAX_VALUE;
+ }
+
private void setAppCollectorsMapToResponse(
List<ApplicationId> runningApps, NodeHeartbeatResponse response) {
Map<ApplicationId, AppCollectorData> liveAppCollectorsMap = new
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 328c040..a5615ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -141,10 +141,11 @@ public interface RMNode {
/**
* Update a {@link NodeHeartbeatResponse} with the list of containers and
- * applications to clean up for this node.
+ * applications to clean up for this node, and the containers to be updated.
+ *
* @param response the {@link NodeHeartbeatResponse} to update
*/
- void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
+ void setAndUpdateNodeHeartbeatResponse(NodeHeartbeatResponse response);
public NodeHeartbeatResponse getLastNodeHeartBeatResponse();
@@ -167,13 +168,7 @@ public interface RMNode {
* @return labels in this node
*/
public Set<String> getNodeLabels();
-
- /**
- * Update containers to be updated
- */
- void updateNodeHeartbeatResponseForUpdatedContainers(
- NodeHeartbeatResponse response);
-
+
public List<Container> pullNewlyIncreasedContainers();
OpportunisticContainersStatus getOpportunisticContainersStatus();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 2b013a0..da54eb9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -598,7 +598,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
};
@Override
- public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
+ public void setAndUpdateNodeHeartbeatResponse(
+ NodeHeartbeatResponse response) {
this.writeLock.lock();
try {
@@ -613,38 +614,30 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.finishedApplications.clear();
this.containersToSignal.clear();
this.containersToBeRemovedFromNM.clear();
- } finally {
- this.writeLock.unlock();
- }
- };
-
- @VisibleForTesting
- public Collection<Container> getToBeUpdatedContainers() {
- return toBeUpdatedContainers.values();
- }
-
- @Override
- public void updateNodeHeartbeatResponseForUpdatedContainers(
- NodeHeartbeatResponse response) {
- this.writeLock.lock();
-
- try {
+
response.addAllContainersToUpdate(toBeUpdatedContainers.values());
toBeUpdatedContainers.clear();
// NOTE: This is required for backward compatibility.
response.addAllContainersToDecrease(toBeDecreasedContainers.values());
toBeDecreasedContainers.clear();
+
+ // Synchronously update the last response in rmNode with updated
+ // responseId
+ this.latestNodeHeartBeatResponse = response;
} finally {
this.writeLock.unlock();
}
+ };
+
+ @VisibleForTesting
+ public Collection<Container> getToBeUpdatedContainers() {
+ return toBeUpdatedContainers.values();
}
@Override
public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
-
this.readLock.lock();
-
try {
return this.latestNodeHeartBeatResponse;
} finally {
@@ -818,7 +811,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private static NodeHealthStatus updateRMNodeFromStatusEvents(
RMNodeImpl rmNode, RMNodeStatusEvent statusEvent) {
// Switch the last heartbeatresponse.
- rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime(remoteNodeHealthStatus
@@ -912,21 +904,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
}
- } else {
- // Reconnected node differs, so replace old node and start new node
- switch (rmNode.getState()) {
- case RUNNING:
- ClusterMetrics.getMetrics().decrNumActiveNodes();
- break;
- case UNHEALTHY:
- ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
- break;
- default:
- LOG.debug("Unexpected Rmnode state");
- }
- rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
- rmNode.context.getDispatcher().getEventHandler().handle(
- new RMNodeStartedEvent(newNode.getNodeID(), null, null));
}
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index f9fe159..c79f270 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -35,20 +34,16 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
public class RMNodeStatusEvent extends RMNodeEvent {
private final NodeStatus nodeStatus;
- private final NodeHeartbeatResponse latestResponse;
private List<LogAggregationReport> logAggregationReportsForApps;
- public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus,
- NodeHeartbeatResponse latestResponse) {
- this(nodeId, nodeStatus, latestResponse, null);
+ public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus) {
+ this(nodeId, nodeStatus, null);
}
public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus,
- NodeHeartbeatResponse latestResponse,
List<LogAggregationReport> logAggregationReportsForApps) {
super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeStatus = nodeStatus;
- this.latestResponse = latestResponse;
this.logAggregationReportsForApps = logAggregationReportsForApps;
}
@@ -60,10 +55,6 @@ public class RMNodeStatusEvent extends RMNodeEvent {
return this.nodeStatus.getContainersStatuses();
}
- public NodeHeartbeatResponse getLatestResponse() {
- return this.latestResponse;
- }
-
public List<ApplicationId> getKeepAliveAppIds() {
return this.nodeStatus.getKeepAliveApplications();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 05b51e3..0a06e82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -131,7 +131,7 @@ public class MockNM {
container.getResource());
List<Container> increasedConts = Collections.singletonList(container);
nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts,
- true, ++responseId);
+ true, responseId);
}
public void addRegisteringCollector(ApplicationId appId,
@@ -190,12 +190,13 @@ public class MockNM {
}
}
}
+ responseId = 0;
return registrationResponse;
}
public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
- Collections.<Container>emptyList(), isHealthy, ++responseId);
+ Collections.<Container>emptyList(), isHealthy, responseId);
}
public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
@@ -208,12 +209,12 @@ public class MockNM {
containerStatusList.add(containerStatus);
Log.getLog().info("ContainerStatus: " + containerStatus);
return nodeHeartbeat(containerStatusList,
- Collections.<Container>emptyList(), true, ++responseId);
+ Collections.<Container>emptyList(), true, responseId);
}
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
- return nodeHeartbeat(conts, isHealthy, ++responseId);
+ return nodeHeartbeat(conts, isHealthy, responseId);
}
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
@@ -229,7 +230,7 @@ public class MockNM {
public NodeHeartbeatResponse nodeHeartbeat(
List<ContainerStatus> updatedStats, boolean isHealthy) throws Exception {
return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
- isHealthy, ++responseId);
+ isHealthy, responseId);
}
public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
@@ -265,7 +266,8 @@ public class MockNM {
NodeHeartbeatResponse heartbeatResponse =
resourceTracker.nodeHeartbeat(req);
-
+ responseId = heartbeatResponse.getResponseId();
+
MasterKey masterKeyFromRM = heartbeatResponse.getContainerTokenMasterKey();
if (masterKeyFromRM != null
&& masterKeyFromRM.getKeyId() != this.currentContainerTokenMasterKey
@@ -303,4 +305,8 @@ public class MockNM {
public String getVersion() {
return version;
}
+
+ public void setResponseId(int id) {
+ this.responseId = id;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 317c648..d6549b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -205,7 +205,8 @@ public class MockNodes {
}
@Override
- public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
+ public void setAndUpdateNodeHeartbeatResponse(
+ NodeHeartbeatResponse response) {
}
@Override
@@ -246,12 +247,6 @@ public class MockNodes {
}
@Override
- public void updateNodeHeartbeatResponseForUpdatedContainers(
- NodeHeartbeatResponse response) {
-
- }
-
- @Override
public List<Container> pullNewlyIncreasedContainers() {
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 3657123..487d226 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -164,15 +164,12 @@ public class TestRMNodeTransitions {
private RMNodeStatusEvent getMockRMNodeStatusEvent(
List<ContainerStatus> containerStatus) {
- NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
-
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
Boolean yes = new Boolean(true);
doReturn(yes).when(healthStatus).getIsNodeHealthy();
RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
doReturn(healthStatus).when(event).getNodeHealthStatus();
- doReturn(response).when(event).getLatestResponse();
doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
if (containerStatus != null) {
doReturn(containerStatus).when(event).getContainers();
@@ -181,15 +178,12 @@ public class TestRMNodeTransitions {
}
private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() {
- NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
-
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
Boolean yes = new Boolean(true);
doReturn(yes).when(healthStatus).getIsNodeHealthy();
RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
doReturn(healthStatus).when(event).getNodeHealthStatus();
- doReturn(response).when(event).getLatestResponse();
doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
doReturn(getAppIdList()).when(event).getKeepAliveAppIds();
return event;
@@ -202,15 +196,12 @@ public class TestRMNodeTransitions {
}
private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() {
- NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
-
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
Boolean yes = new Boolean(true);
doReturn(yes).when(healthStatus).getIsNodeHealthy();
RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
doReturn(healthStatus).when(event).getNodeHealthStatus();
- doReturn(response).when(event).getLatestResponse();
doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
doReturn(null).when(event).getKeepAliveAppIds();
return event;
@@ -646,7 +637,7 @@ public class TestRMNodeTransitions {
Assert.assertEquals(1, node.getContainersToCleanUp().size());
Assert.assertEquals(1, node.getAppsToCleanup().size());
NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class);
- node.updateNodeHeartbeatResponseForCleanup(hbrsp);
+ node.setAndUpdateNodeHeartbeatResponse(hbrsp);
Assert.assertEquals(0, node.getContainersToCleanUp().size());
Assert.assertEquals(0, node.getAppsToCleanup().size());
Assert.assertEquals(1, hbrsp.getContainersToCleanup().size());
@@ -1108,7 +1099,8 @@ public class TestRMNodeTransitions {
NodeHeartbeatResponse hbrsp =
Records.newRecord(NodeHeartbeatResponse.class);
- node.updateNodeHeartbeatResponseForCleanup(hbrsp);
+ node.setAndUpdateNodeHeartbeatResponse(hbrsp);
+
Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size());
Assert.assertEquals(0, node.getCompletedContainers().size());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index fc6326e..96e4451 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -801,7 +801,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
Records.newRecord(NodeHeartbeatRequest.class);
heartbeatReq.setNodeLabels(null); // Node heartbeat label update
nodeStatusObject = getNodeStatusObject(nodeId);
- nodeStatusObject.setResponseId(responseId+2);
+ nodeStatusObject.setResponseId(responseId+1);
heartbeatReq.setNodeStatus(nodeStatusObject);
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
.getNMTokenMasterKey());
@@ -1128,8 +1128,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
"", System.currentTimeMillis());
NodeStatus nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0,
statusList, null, nodeHealth, null, null, null);
- node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus,
- nodeHeartbeat1));
+ node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
Assert.assertEquals(1, node1.getRunningApps().size());
Assert.assertEquals(app1.getApplicationId(), node1.getRunningApps().get(0));
@@ -1145,8 +1144,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
statusList.add(status2);
nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0,
statusList, null, nodeHealth, null, null, null);
- node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeStatus,
- nodeHeartbeat2));
+ node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeStatus));
Assert.assertEquals(1, node2.getRunningApps().size());
Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0));
@@ -2290,4 +2288,31 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
}
}
}
+
+ @Test
+ public void testResponseIdOverflow() throws Exception {
+ Configuration conf = new Configuration();
+ rm = new MockRM(conf);
+ rm.start();
+
+ MockNM nm1 = rm.registerNode("host1:1234", 5120);
+
+ NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+ Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
+
+ // prepare the responseId that's about to overflow
+ RMNode node = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+ node.getLastNodeHeartBeatResponse().setResponseId(Integer.MAX_VALUE);
+
+ nm1.setResponseId(Integer.MAX_VALUE);
+
+ // heartbeat twice and check responseId
+ nodeHeartbeat = nm1.nodeHeartbeat(true);
+ Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
+ Assert.assertEquals(0, nodeHeartbeat.getResponseId());
+
+ nodeHeartbeat = nm1.nodeHeartbeat(true);
+ Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
+ Assert.assertEquals(1, nodeHeartbeat.getResponseId());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff8378eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 677990b..c2bc611 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -172,7 +172,7 @@ public class TestRMAppLogAggregationStatus {
NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0,
new ArrayList<ContainerStatus>(), null,
NodeHealthStatus.newInstance(true, null, 0), null, null, null);
- node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
+ node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
node1ReportForApp));
List<LogAggregationReport> node2ReportForApp =
@@ -186,7 +186,7 @@ public class TestRMAppLogAggregationStatus {
NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0,
new ArrayList<ContainerStatus>(), null,
NodeHealthStatus.newInstance(true, null, 0), null, null, null);
- node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null,
+ node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2,
node2ReportForApp));
// node1 and node2 has updated its log aggregation status
// verify that the log aggregation status for node1, node2
@@ -223,7 +223,7 @@ public class TestRMAppLogAggregationStatus {
LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING, messageForNode1_2);
node1ReportForApp2.add(report1_2);
- node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
+ node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
node1ReportForApp2));
// verify that the log aggregation status for node1
@@ -291,7 +291,7 @@ public class TestRMAppLogAggregationStatus {
LogAggregationStatus.SUCCEEDED, ""));
// For every logAggregationReport cached in memory, we can only save at most
// 10 diagnostic messages/failure messages
- node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
+ node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
node1ReportForApp3));
logAggregationStatus = rmApp.getLogAggregationReportsForApp();
@@ -335,7 +335,7 @@ public class TestRMAppLogAggregationStatus {
LogAggregationStatus.FAILED, "");
node2ReportForApp2.add(report2_2);
node2ReportForApp2.add(report2_3);
- node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null,
+ node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2,
node2ReportForApp2));
Assert.assertEquals(LogAggregationStatus.FAILED,
rmApp.getLogAggregationStatusForAppReport());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org