You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/06 00:04:10 UTC
[03/27] hadoop git commit: YARN-3222. Fixed NPE on
RMNodeImpl#ReconnectNodeTransition when a node is reconnected with a
different port. Contributed by Rohith Sharmaks
YARN-3222. Fixed NPE on RMNodeImpl#ReconnectNodeTransition when a node is reconnected with a different port. Contributed by Rohith Sharmaks
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b2f1ec31
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b2f1ec31
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b2f1ec31
Branch: refs/heads/YARN-2928
Commit: b2f1ec312ee431aef762cfb49cb29cd6f4661e86
Parents: 15b7076
Author: Jian He <ji...@apache.org>
Authored: Tue Mar 3 16:25:57 2015 -0800
Committer: Jian He <ji...@apache.org>
Committed: Tue Mar 3 16:28:28 2015 -0800
----------------------------------------------------------------------
.../resourcemanager/rmnode/RMNodeImpl.java | 34 +++++++++++---------
.../yarn/server/resourcemanager/MockNM.java | 6 +++-
.../TestResourceTrackerService.java | 17 +++++++++-
3 files changed, 39 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2f1ec31/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 9701775..c556b80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -571,12 +571,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
rmNode.nodeUpdateQueue.clear();
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
-
+
if (rmNode.getHttpPort() == newNode.getHttpPort()) {
// Reset heartbeat ID since node just restarted.
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
- if (rmNode.getState() != NodeState.UNHEALTHY) {
- // Only add new node if old state is not UNHEALTHY
+ if (rmNode.getState().equals(NodeState.RUNNING)) {
+ // Only add new node if old state is RUNNING
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(newNode));
}
@@ -599,30 +599,32 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} else {
rmNode.httpPort = newNode.getHttpPort();
rmNode.httpAddress = newNode.getHttpAddress();
- rmNode.totalCapability = newNode.getTotalCapability();
+ boolean isCapabilityChanged = false;
+ if (rmNode.getTotalCapability() != newNode.getTotalCapability()) {
+ rmNode.totalCapability = newNode.getTotalCapability();
+ isCapabilityChanged = true;
+ }
handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode);
// Reset heartbeat ID since node just restarted.
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
- }
- if (null != reconnectEvent.getRunningApplications()) {
for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
}
- }
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodesListManagerEvent(
- NodesListManagerEventType.NODE_USABLE, rmNode));
- if (rmNode.getState().equals(NodeState.RUNNING)) {
- // Update scheduler node's capacity for reconnect node.
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeResourceUpdateSchedulerEvent(rmNode,
- ResourceOption.newInstance(newNode.getTotalCapability(), -1)));
+ if (isCapabilityChanged
+ && rmNode.getState().equals(NodeState.RUNNING)) {
+ // Update scheduler node's capacity for reconnect node.
+ rmNode.context
+ .getDispatcher()
+ .getEventHandler()
+ .handle(
+ new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption
+ .newInstance(newNode.getTotalCapability(), -1)));
+ }
}
-
}
private void handleNMContainerStatus(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2f1ec31/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 5f53805..c917f79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -51,7 +51,7 @@ public class MockNM {
private final int memory;
private final int vCores;
private ResourceTrackerService resourceTracker;
- private final int httpPort = 2;
+ private int httpPort = 2;
private MasterKey currentContainerTokenMasterKey;
private MasterKey currentNMTokenMasterKey;
private String version;
@@ -87,6 +87,10 @@ public class MockNM {
return httpPort;
}
+ public void setHttpPort(int port) {
+ httpPort = port;
+ }
+
public void setResourceTrackerService(ResourceTrackerService resourceTracker) {
this.resourceTracker = resourceTracker;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2f1ec31/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 7c12848..a904dc0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -623,7 +624,7 @@ public class TestResourceTrackerService {
dispatcher.await();
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
-
+
// reconnect of node with changed capability and running applications
List<ApplicationId> runningApps = new ArrayList<ApplicationId>();
runningApps.add(ApplicationId.newInstance(1, 0));
@@ -633,6 +634,20 @@ public class TestResourceTrackerService {
dispatcher.await();
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
+
+ // reconnect healthy node changing http port
+ nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService());
+ nm1.setHttpPort(3);
+ nm1.registerNode();
+ dispatcher.await();
+ response = nm1.nodeHeartbeat(true);
+ response = nm1.nodeHeartbeat(true);
+ dispatcher.await();
+ RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+ Assert.assertEquals(3, rmNode.getHttpPort());
+ Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory());
+ Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
+
}
private void writeToHostsFile(String... hosts) throws IOException {