You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2016/02/02 00:16:42 UTC
hadoop git commit: YARN-3102. Decommisioned Nodes not listed in Web
UI. Contributed by Kuhu Shukla (cherry picked from commit
ed55950164a66e08fa34e30dba1030c5a986d1f1)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 fc8d9cc75 -> 36aae8050
YARN-3102. Decommisioned Nodes not listed in Web UI. Contributed by Kuhu Shukla
(cherry picked from commit ed55950164a66e08fa34e30dba1030c5a986d1f1)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/36aae805
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/36aae805
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/36aae805
Branch: refs/heads/branch-2
Commit: 36aae8050e4bd49752b74ed8d83d3cce7da48c71
Parents: fc8d9cc
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Feb 1 23:15:26 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Feb 1 23:16:18 2016 +0000
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../resourcemanager/NodesListManager.java | 119 +++++++++++++-
.../server/resourcemanager/ResourceManager.java | 5 +
.../resourcemanager/rmnode/RMNodeImpl.java | 24 ++-
.../yarn/server/resourcemanager/MockRM.java | 16 ++
.../server/resourcemanager/TestRMRestart.java | 11 +-
.../TestResourceTrackerService.java | 154 +++++++++++++++----
7 files changed, 282 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/36aae805/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e781ca5..40715ad 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -1392,6 +1392,9 @@ Release 2.7.3 - UNRELEASED
YARN-4428. Redirect RM page to AHS page when AHS turned on and RM page is
not available (Chang Li via jlowe)
+ YARN-3102. Decommisioned Nodes not listed in Web UI (Kuhu Shukla via
+ jlowe)
+
Release 2.7.2 - 2016-01-25
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/36aae805/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index 96307a7..e6251fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.Node;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
@@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.util.Clock;
@@ -96,7 +98,7 @@ public class NodesListManager extends CompositeService implements
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
this.hostsReader =
createHostsFileReader(this.includesFile, this.excludesFile);
- setDecomissionedNMsMetrics();
+ setDecomissionedNMs();
printConfiguredHosts();
} catch (YarnException ex) {
disableHostsFileReader(ex);
@@ -158,9 +160,24 @@ public class NodesListManager extends CompositeService implements
}
}
- private void setDecomissionedNMsMetrics() {
+ private void setDecomissionedNMs() {
Set<String> excludeList = hostsReader.getExcludedHosts();
- ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size());
+ for (final String host : excludeList) {
+ UnknownNodeId nodeId = new UnknownNodeId(host);
+ RMNodeImpl rmNode = new RMNodeImpl(nodeId,
+ rmContext, host, -1, -1, new UnknownNode(host), null, null);
+
+ RMNode prevRMNode =
+ rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
+ if (prevRMNode != null) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeEvent(prevRMNode.getNodeID(),
+ RMNodeEventType.DECOMMISSION));
+ } else {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+ }
+ }
}
@VisibleForTesting
@@ -335,7 +352,7 @@ public class NodesListManager extends CompositeService implements
conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
this.hostsReader =
createHostsFileReader(this.includesFile, this.excludesFile);
- setDecomissionedNMsMetrics();
+ setDecomissionedNMs();
} catch (IOException ioe2) {
// Should *never* happen
this.hostsReader = null;
@@ -418,4 +435,98 @@ public class NodesListManager extends CompositeService implements
}
}
}
+
+ /**
+ * A NodeId instance needed upon startup for populating inactive nodes Map.
+ * It only knows the hostname/ip and marks the port to -1 or invalid.
+ */
+ public static class UnknownNodeId extends NodeId {
+
+ private String host;
+
+ public UnknownNodeId(String host) {
+ this.host = host;
+ }
+
+ @Override
+ public String getHost() {
+ return this.host;
+ }
+
+ @Override
+ protected void setHost(String hst) {
+
+ }
+
+ @Override
+ public int getPort() {
+ return -1;
+ }
+
+ @Override
+ protected void setPort(int port) {
+
+ }
+
+ @Override
+ protected void build() {
+
+ }
+ }
+
+ /**
+ * A Node instance needed upon startup for populating inactive nodes Map.
+ * It only knows its hostname/ip.
+ */
+ private static class UnknownNode implements Node {
+
+ private String host;
+
+ public UnknownNode(String host) {
+ this.host = host;
+ }
+
+ @Override
+ public String getNetworkLocation() {
+ return null;
+ }
+
+ @Override
+ public void setNetworkLocation(String location) {
+
+ }
+
+ @Override
+ public String getName() {
+ return host;
+ }
+
+ @Override
+ public Node getParent() {
+ return null;
+ }
+
+ @Override
+ public void setParent(Node parent) {
+
+ }
+
+ @Override
+ public int getLevel() {
+ return 0;
+ }
+
+ @Override
+ public void setLevel(int i) {
+
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String hst) {
+ this.host = hst;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/36aae805/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 40d627e..b2950bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -203,6 +203,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
clusterTimeStamp = timestamp;
}
+ @VisibleForTesting
+ Dispatcher getRmDispatcher() {
+ return rmDispatcher;
+ }
+
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/36aae805/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 3873e5f..433e189 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -172,6 +173,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
.addTransition(NodeState.NEW, NodeState.NEW,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
+ .addTransition(NodeState.NEW, NodeState.DECOMMISSIONED,
+ RMNodeEventType.DECOMMISSION,
+ new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
//Transitions from RUNNING state
.addTransition(NodeState.RUNNING,
@@ -691,6 +695,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
case UNHEALTHY:
metrics.decrNumUnhealthyNMs();
break;
+ case NEW:
+ break;
default:
LOG.warn("Unexpected initial state");
}
@@ -768,12 +774,18 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
List<NMContainerStatus> containers = null;
NodeId nodeId = rmNode.nodeId;
- if (rmNode.context.getInactiveRMNodes().containsKey(nodeId)) {
- // Old node rejoining
- RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(nodeId);
- rmNode.context.getInactiveRMNodes().remove(nodeId);
- rmNode.updateMetricsForRejoinedNode(previouRMNode.getState());
+ RMNode previousRMNode =
+ rmNode.context.getInactiveRMNodes().remove(nodeId);
+ if (previousRMNode != null) {
+ rmNode.updateMetricsForRejoinedNode(previousRMNode.getState());
} else {
+ NodesListManager.UnknownNodeId unknownNodeId =
+ new NodesListManager.UnknownNodeId(nodeId.getHost());
+ previousRMNode =
+ rmNode.context.getInactiveRMNodes().remove(unknownNodeId);
+ if (previousRMNode != null) {
+ ClusterMetrics.getMetrics().decrDecommisionedNMs();
+ }
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
containers = startEvent.getNMContainerStatuses();
@@ -785,7 +797,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
}
}
-
+
if (null != startEvent.getRunningApplications()) {
for (ApplicationId appId : startEvent.getRunningApplications()) {
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/36aae805/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index f6b1f43..d5b64c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -142,6 +144,20 @@ public class MockRM extends ResourceManager {
}
}
+ @Override
+ protected Dispatcher createDispatcher() {
+ return new DrainDispatcher();
+ }
+
+ public void drainEvents() {
+ Dispatcher rmDispatcher = getRmDispatcher();
+ if (rmDispatcher instanceof DrainDispatcher) {
+ ((DrainDispatcher) rmDispatcher).await();
+ } else {
+ throw new UnsupportedOperationException("Not a Drain Dispatcher!");
+ }
+ }
+
public void waitForState(ApplicationId appId, RMAppState finalState)
throws Exception {
RMApp app = getRMContext().getRMApps().get(appId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/36aae805/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index e999e6b..028afb1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -1888,15 +1888,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
writeToHostsFile("");
- final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm1 = null, rm2 = null;
try {
- rm1 = new MockRM(conf) {
- @Override
- protected Dispatcher createDispatcher() {
- return dispatcher;
- }
- };
+ rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
MockNM nm2 = rm1.registerNode("host2:1234", 8000);
@@ -1917,7 +1911,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
Assert.assertTrue("The decommisioned metrics are not updated",
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
- dispatcher.await();
+ rm1.drainEvents();
Assert
.assertEquals(2,
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
@@ -1930,6 +1924,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// restart RM.
rm2 = new MockRM(conf);
rm2.start();
+ rm2.drainEvents();
Assert
.assertEquals(2,
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/36aae805/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index e42ed91..e0fd9ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -168,27 +168,21 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
.getAbsolutePath());
writeToHostsFile("");
- final DrainDispatcher dispatcher = new DrainDispatcher();
- rm = new MockRM(conf) {
- @Override
- protected Dispatcher createDispatcher() {
- return dispatcher;
- }
- };
+ rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
- dispatcher.await();
+ rm.drainEvents();
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
- dispatcher.await();
+ rm.drainEvents();
// To test that IPs also work
String ip = NetUtils.normalizeHostName("localhost");
@@ -207,15 +201,15 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue("The decommisioned metrics are not updated",
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
- dispatcher.await();
+ rm.drainEvents();
writeToHostsFile("");
rm.getNodesListManager().refreshNodes(conf);
nm3 = rm.registerNode("localhost:4433", 1024);
- dispatcher.await();
+ rm.drainEvents();
nodeHeartbeat = nm3.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
// decommissined node is 1 since 1 node is rejoined after updating exclude
// file
@@ -990,7 +984,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
@Test
public void testReconnectNode() throws Exception {
- final DrainDispatcher dispatcher = new DrainDispatcher();
rm = new MockRM() {
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
@@ -1001,11 +994,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
}
};
}
-
- @Override
- protected Dispatcher createDispatcher() {
- return dispatcher;
- }
};
rm.start();
@@ -1013,7 +1001,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
MockNM nm2 = rm.registerNode("host2:5678", 5120);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(false);
- dispatcher.await();
+ rm.drainEvents();
checkUnealthyNMCount(rm, nm2, true, 1);
final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
@@ -1024,7 +1012,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
nm1 = rm.registerNode("host1:1234", 5120);
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
checkUnealthyNMCount(rm, nm2, true, 1);
@@ -1032,23 +1020,23 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
nm2 = rm.registerNode("host2:5678", 5120);
response = nm2.nodeHeartbeat(false);
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
checkUnealthyNMCount(rm, nm2, true, 1);
// unhealthy node changed back to healthy
nm2 = rm.registerNode("host2:5678", 5120);
- dispatcher.await();
+ rm.drainEvents();
response = nm2.nodeHeartbeat(true);
response = nm2.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(5120 + 5120, metrics.getAvailableMB());
// reconnect of node with changed capability
nm1 = rm.registerNode("host2:5678", 10240);
- dispatcher.await();
+ rm.drainEvents();
response = nm1.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
@@ -1056,9 +1044,9 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
List<ApplicationId> runningApps = new ArrayList<ApplicationId>();
runningApps.add(ApplicationId.newInstance(1, 0));
nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps);
- dispatcher.await();
+ rm.drainEvents();
response = nm1.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
@@ -1066,10 +1054,10 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService());
nm1.setHttpPort(3);
nm1.registerNode();
- dispatcher.await();
+ rm.drainEvents();
response = nm1.nodeHeartbeat(true);
response = nm1.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
Assert.assertEquals(3, rmNode.getHttpPort());
Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory());
@@ -1184,14 +1172,116 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
}
+ @Test(timeout = 30000)
+ public void testInitDecommMetric() throws Exception {
+ testInitDecommMetricHelper(true);
+ testInitDecommMetricHelper(false);
+ }
+
+ public void testInitDecommMetricHelper(boolean hasIncludeList)
+ throws Exception {
+ Configuration conf = new Configuration();
+ rm = new MockRM(conf);
+ rm.start();
+ MockNM nm1 = rm.registerNode("host1:1234", 5120);
+ MockNM nm2 = rm.registerNode("host2:5678", 10240);
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+
+ File excludeHostFile =
+ new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
+ writeToHostsFile(excludeHostFile, "host1");
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+ excludeHostFile.getAbsolutePath());
+
+ if (hasIncludeList) {
+ writeToHostsFile(hostFile, "host1", "host2");
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+ hostFile.getAbsolutePath());
+ }
+ rm.getNodesListManager().refreshNodes(conf);
+ rm.drainEvents();
+ rm.stop();
+
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ nm1 = rm1.registerNode("host1:1234", 5120);
+ nm2 = rm1.registerNode("host2:5678", 10240);
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ rm1.drainEvents();
+ Assert.assertEquals("Number of Decommissioned nodes should be 1",
+ 1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+ Assert.assertEquals("The inactiveRMNodes should contain an entry for the" +
+ "decommissioned node",
+ 1, rm1.getRMContext().getInactiveRMNodes().size());
+ excludeHostFile =
+ new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
+ writeToHostsFile(excludeHostFile, "");
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+ excludeHostFile.getAbsolutePath());
+ rm1.getNodesListManager().refreshNodes(conf);
+ nm1 = rm1.registerNode("host1:1234", 5120);
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ rm1.drainEvents();
+ Assert.assertEquals("The decommissioned nodes metric should have " +
+ "decremented to 0",
+ 0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+ Assert.assertEquals("The active nodes metric should be 2",
+ 2, ClusterMetrics.getMetrics().getNumActiveNMs());
+ Assert.assertEquals("The inactive RMNodes entry should have been removed",
+ 0, rm1.getRMContext().getInactiveRMNodes().size());
+ rm1.drainEvents();
+ rm1.stop();
+ }
+
+ @Test(timeout = 30000)
+ public void testInitDecommMetricNoRegistration() throws Exception {
+ Configuration conf = new Configuration();
+ rm = new MockRM(conf);
+ rm.start();
+ MockNM nm1 = rm.registerNode("host1:1234", 5120);
+ MockNM nm2 = rm.registerNode("host2:5678", 10240);
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ //host3 will not register or heartbeat
+ File excludeHostFile =
+ new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
+ writeToHostsFile(excludeHostFile, "host3", "host2");
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+ excludeHostFile.getAbsolutePath());
+ writeToHostsFile(hostFile, "host1", "host2");
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+ hostFile.getAbsolutePath());
+ rm.getNodesListManager().refreshNodes(conf);
+ rm.drainEvents();
+ Assert.assertEquals("The decommissioned nodes metric should be 1 ",
+ 1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+ rm.stop();
+
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ rm1.getNodesListManager().refreshNodes(conf);
+ rm1.drainEvents();
+ Assert.assertEquals("The decommissioned nodes metric should be 2 ",
+ 2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+ rm1.stop();
+ }
+
private void writeToHostsFile(String... hosts) throws IOException {
- if (!hostFile.exists()) {
+ writeToHostsFile(hostFile, hosts);
+ }
+
+ private void writeToHostsFile(File file, String... hosts)
+ throws IOException {
+ if (!file.exists()) {
TEMP_DIR.mkdirs();
- hostFile.createNewFile();
+ file.createNewFile();
}
FileOutputStream fStream = null;
try {
- fStream = new FileOutputStream(hostFile);
+ fStream = new FileOutputStream(file);
for (int i = 0; i < hosts.length; i++) {
fStream.write(hosts[i].getBytes());
fStream.write("\n".getBytes());