You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2019/03/20 06:09:32 UTC
[hadoop] branch HDFS-13891 updated: HDFS-14351. RBF: Optimize
configuration item resolving for monitor namenode. Contributed by He
Xiaoqiao and Inigo Goiri.
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/HDFS-13891 by this push:
new 0c686b7 HDFS-14351. RBF: Optimize configuration item resolving for monitor namenode. Contributed by He Xiaoqiao and Inigo Goiri.
0c686b7 is described below
commit 0c686b72068d0546e7461057dc2b95258fea7e23
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Wed Mar 20 11:12:49 2019 +0530
HDFS-14351. RBF: Optimize configuration item resolving for monitor namenode. Contributed by He Xiaoqiao and Inigo Goiri.
---
.../hdfs/server/federation/router/Router.java | 38 ++-
.../hdfs/server/federation/MockNamenode.java | 225 +++++++++++++++++
.../router/TestRouterNamenodeMonitoring.java | 278 +++++++++++++++------
3 files changed, 444 insertions(+), 97 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index 7d112f9..9e18ebf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -497,27 +497,25 @@ public class Router extends CompositeService {
}
// Create heartbeat services for a list specified by the admin
- String namenodes = this.conf.get(
+ Collection<String> namenodes = this.conf.getTrimmedStringCollection(
RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE);
- if (namenodes != null) {
- for (String namenode : namenodes.split(",")) {
- String[] namenodeSplit = namenode.split("\\.");
- String nsId = null;
- String nnId = null;
- if (namenodeSplit.length == 2) {
- nsId = namenodeSplit[0];
- nnId = namenodeSplit[1];
- } else if (namenodeSplit.length == 1) {
- nsId = namenode;
- } else {
- LOG.error("Wrong Namenode to monitor: {}", namenode);
- }
- if (nsId != null) {
- NamenodeHeartbeatService heartbeatService =
- createNamenodeHeartbeatService(nsId, nnId);
- if (heartbeatService != null) {
- ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
- }
+ for (String namenode : namenodes) {
+ String[] namenodeSplit = namenode.split("\\.");
+ String nsId = null;
+ String nnId = null;
+ if (namenodeSplit.length == 2) {
+ nsId = namenodeSplit[0];
+ nnId = namenodeSplit[1];
+ } else if (namenodeSplit.length == 1) {
+ nsId = namenode;
+ } else {
+ LOG.error("Wrong Namenode to monitor: {}", namenode);
+ }
+ if (nsId != null) {
+ NamenodeHeartbeatService heartbeatService =
+ createNamenodeHeartbeatService(nsId, nnId);
+ if (heartbeatService != null) {
+ ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
new file mode 100644
index 0000000..9b58fff
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HAServiceStatus;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.protobuf.BlockingService;
+
+
+/**
+ * Mock for the network interfaces (e.g., RPC and HTTP) of a Namenode. This is
+ * used by the Routers in a mock cluster.
+ */
+public class MockNamenode {
+
+ /** Mock implementation of the Namenode. */
+ private final NamenodeProtocols mockNn;
+
+ /** HA state of the Namenode. */
+ private HAServiceState haState = HAServiceState.STANDBY;
+
+ /** RPC server of the Namenode that redirects calls to the mock. */
+ private Server rpcServer;
+ /** HTTP server of the Namenode that redirects calls to the mock. */
+ private HttpServer2 httpServer;
+
+
+ public MockNamenode() throws Exception {
+ Configuration conf = new Configuration();
+
+ this.mockNn = mock(NamenodeProtocols.class);
+ setupMock();
+ setupRPCServer(conf);
+ setupHTTPServer(conf);
+ }
+
+ /**
+ * Setup the mock of the Namenode. It offers the basic functionality for
+ * Routers to get the status.
+ * @throws IOException If the mock cannot be setup.
+ */
+ protected void setupMock() throws IOException {
+ NamespaceInfo nsInfo = new NamespaceInfo(1, "clusterId", "bpId", 1);
+ when(mockNn.versionRequest()).thenReturn(nsInfo);
+
+ when(mockNn.getServiceStatus()).thenAnswer(new Answer<HAServiceStatus>() {
+ @Override
+ public HAServiceStatus answer(InvocationOnMock invocation)
+ throws Throwable {
+ HAServiceStatus haStatus = new HAServiceStatus(getHAServiceState());
+ haStatus.setNotReadyToBecomeActive("");
+ return haStatus;
+ }
+ });
+ }
+
+ /**
+ * Setup the RPC server of the Namenode that redirects calls to the mock.
+ * @param conf Configuration of the server.
+ * @throws IOException If the RPC server cannot be setup.
+ */
+ private void setupRPCServer(final Configuration conf) throws IOException {
+ RPC.setProtocolEngine(
+ conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
+ ClientNamenodeProtocolServerSideTranslatorPB
+ clientNNProtoXlator =
+ new ClientNamenodeProtocolServerSideTranslatorPB(mockNn);
+ BlockingService clientNNPbService =
+ ClientNamenodeProtocol.newReflectiveBlockingService(
+ clientNNProtoXlator);
+
+ rpcServer = new RPC.Builder(conf)
+ .setProtocol(ClientNamenodeProtocolPB.class)
+ .setInstance(clientNNPbService)
+ .setBindAddress("0.0.0.0")
+ .setPort(0)
+ .build();
+
+ NamenodeProtocolServerSideTranslatorPB nnProtoXlator =
+ new NamenodeProtocolServerSideTranslatorPB(mockNn);
+ BlockingService nnProtoPbService =
+ NamenodeProtocolService.newReflectiveBlockingService(
+ nnProtoXlator);
+ DFSUtil.addPBProtocol(
+ conf, NamenodeProtocolPB.class, nnProtoPbService, rpcServer);
+
+ DatanodeProtocolServerSideTranslatorPB dnProtoPbXlator =
+ new DatanodeProtocolServerSideTranslatorPB(mockNn, 1000);
+ BlockingService dnProtoPbService =
+ DatanodeProtocolService.newReflectiveBlockingService(
+ dnProtoPbXlator);
+ DFSUtil.addPBProtocol(
+ conf, DatanodeProtocolPB.class, dnProtoPbService, rpcServer);
+
+ HAServiceProtocolServerSideTranslatorPB haServiceProtoXlator =
+ new HAServiceProtocolServerSideTranslatorPB(mockNn);
+ BlockingService haProtoPbService =
+ HAServiceProtocolService.newReflectiveBlockingService(
+ haServiceProtoXlator);
+ DFSUtil.addPBProtocol(
+ conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer);
+
+ rpcServer.start();
+ }
+
+ /**
+ * Setup the HTTP server of the Namenode that redirects calls to the mock.
+ * @param conf Configuration of the server.
+ * @throws IOException If the HTTP server cannot be setup.
+ */
+ private void setupHTTPServer(Configuration conf) throws IOException {
+ HttpServer2.Builder builder = new HttpServer2.Builder()
+ .setName("hdfs")
+ .setConf(conf)
+ .setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
+ .addEndpoint(URI.create("http://0.0.0.0:0"));
+ httpServer = builder.build();
+ httpServer.start();
+ }
+
+ /**
+ * Get the RPC port for the Mock Namenode.
+ * @return RPC port.
+ */
+ public int getRPCPort() {
+ return rpcServer.getListenerAddress().getPort();
+ }
+
+ /**
+ * Get the HTTP port for the Mock Namenode.
+ * @return HTTP port.
+ */
+ public int getHTTPPort() {
+ return httpServer.getConnectorAddress(0).getPort();
+ }
+
+ /**
+ * Get the Mock core. This is used to extend the mock.
+ * @return Mock Namenode protocol to be extended.
+ */
+ public NamenodeProtocols getMock() {
+ return mockNn;
+ }
+
+ /**
+ * Get the HA state of the Mock Namenode.
+ * @return HA state (ACTIVE or STANDBY).
+ */
+ public HAServiceState getHAServiceState() {
+ return haState;
+ }
+
+ /**
+ * Show the Mock Namenode as Active.
+ */
+ public void transitionToActive() {
+ this.haState = HAServiceState.ACTIVE;
+ }
+
+ /**
+ * Show the Mock Namenode as Standby.
+ */
+ public void transitionToStandby() {
+ this.haState = HAServiceState.STANDBY;
+ }
+
+ /**
+ * Stop the Mock Namenode. It stops all the servers.
+ * @throws Exception If it cannot stop the Namenode.
+ */
+ public void stop() throws Exception {
+ if (rpcServer != null) {
+ rpcServer.stop();
+ }
+ if (httpServer != null) {
+ httpServer.stop();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
index 0bea11c..1224fa2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
@@ -17,127 +17,251 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
-import static org.junit.Assert.assertEquals;
+import static java.util.Arrays.asList;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.MockNamenode;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
-import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
-import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
-import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test namenodes monitor behavior in the Router.
*/
public class TestRouterNamenodeMonitoring {
- private static StateStoreDFSCluster cluster;
- private static RouterContext routerContext;
- private static MembershipNamenodeResolver resolver;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestRouterNamenodeMonitoring.class);
- private String ns0;
- private String ns1;
- private long initializedTime;
- @Before
- public void setUp() throws Exception {
- // Build and start a federated cluster with HA enabled
- cluster = new StateStoreDFSCluster(true, 2);
- // Enable heartbeat service and local heartbeat
- Configuration routerConf = new RouterConfigBuilder()
- .stateStore()
- .admin()
- .rpc()
- .enableLocalHeartbeat(true)
- .heartbeat()
- .build();
+ /** Router for the test. */
+ private Router router;
+ /** Namenodes in the cluster. */
+ private Map<String, Map<String, MockNamenode>> nns = new HashMap<>();
+ /** Nameservices in the federated cluster. */
+ private List<String> nsIds = asList("ns0", "ns1");
- // Specify local node (ns0.nn1) to monitor
- StringBuilder sb = new StringBuilder();
- ns0 = cluster.getNameservices().get(0);
- NamenodeContext context = cluster.getNamenodes(ns0).get(1);
- routerConf.set(DFS_NAMESERVICE_ID, ns0);
- routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId());
+ /** Time the test starts. */
+ private long initializedTime;
- // Specify namenodes (ns1.nn0,ns1.nn1) to monitor
- sb = new StringBuilder();
- ns1 = cluster.getNameservices().get(1);
- for (NamenodeContext ctx : cluster.getNamenodes(ns1)) {
- String suffix = ctx.getConfSuffix();
- if (sb.length() != 0) {
- sb.append(",");
+
+ @Before
+ public void setup() throws Exception {
+ LOG.info("Initialize the Mock Namenodes to monitor");
+ for (String nsId : nsIds) {
+ nns.put(nsId, new HashMap<>());
+ for (String nnId : asList("nn0", "nn1")) {
+ nns.get(nsId).put(nnId, new MockNamenode());
}
- sb.append(suffix);
}
- // override with the namenodes: ns1.nn0,ns1.nn1
- routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
- cluster.addRouterOverrides(routerConf);
- cluster.startCluster();
- cluster.startRouters();
- cluster.waitClusterUp();
+ LOG.info("Set nn0 to active for all nameservices");
+ for (Map<String, MockNamenode> nnNS : nns.values()) {
+ nnNS.get("nn0").transitionToActive();
+ nnNS.get("nn1").transitionToStandby();
+ }
- routerContext = cluster.getRandomRouter();
- resolver = (MembershipNamenodeResolver) routerContext.getRouter()
- .getNamenodeResolver();
initializedTime = Time.now();
}
@After
- public void tearDown() {
- if (cluster != null) {
- cluster.stopRouter(routerContext);
- cluster.shutdown();
- cluster = null;
+ public void cleanup() throws Exception {
+ for (Map<String, MockNamenode> nnNS : nns.values()) {
+ for (MockNamenode nn : nnNS.values()) {
+ nn.stop();
+ }
+ }
+ nns.clear();
+
+ if (router != null) {
+ router.stop();
+ }
+ }
+
+ /**
+ * Get the configuration of the cluster which contains all the Namenodes and
+ * their addresses.
+ * @return Configuration containing all the Namenodes.
+ */
+ private Configuration getNamenodesConfig() {
+ final Configuration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_NAMESERVICES,
+ StringUtils.join(",", nns.keySet()));
+ for (String nsId : nns.keySet()) {
+ Set<String> nnIds = nns.get(nsId).keySet();
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX);
+ sb.append(".").append(nsId);
+ conf.set(sb.toString(), StringUtils.join(",", nnIds));
+
+ for (String nnId : nnIds) {
+ final MockNamenode nn = nns.get(nsId).get(nnId);
+
+ sb = new StringBuilder();
+ sb.append(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
+ sb.append(".").append(nsId);
+ sb.append(".").append(nnId);
+ conf.set(sb.toString(), "localhost:" + nn.getRPCPort());
+
+ sb = new StringBuilder();
+ sb.append(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+ sb.append(".").append(nsId);
+ sb.append(".").append(nnId);
+ conf.set(sb.toString(), "localhost:" + nn.getHTTPPort());
+ }
}
+ return conf;
}
@Test
public void testNamenodeMonitoring() throws Exception {
- // Set nn0 to active for all nameservices
- for (String ns : cluster.getNameservices()) {
- cluster.switchToActive(ns, "nn0");
- cluster.switchToStandby(ns, "nn1");
- }
+ Configuration nsConf = getNamenodesConfig();
- Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
- .getRouter().getNamenodeHeartbeatServices();
- // manually trigger the heartbeat
+ // Setup the State Store for the Router to use
+ Configuration stateStoreConfig = getStateStoreConfiguration();
+ stateStoreConfig.setClass(
+ RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+ MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
+ stateStoreConfig.setClass(
+ RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+ MountTableResolver.class, FileSubclusterResolver.class);
+
+ Configuration routerConf = new RouterConfigBuilder(nsConf)
+ .enableLocalHeartbeat(true)
+ .heartbeat()
+ .stateStore()
+ .rpc()
+ .build();
+
+ // Specify namenodes (ns1.nn0,ns1.nn1) to monitor
+ routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
+ routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE,
+ "ns1.nn0,ns1.nn1");
+ routerConf.addResource(stateStoreConfig);
+
+ // Specify local node (ns0.nn1) to monitor
+ routerConf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, "ns0");
+ routerConf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
+
+ // Start the Router with the namenodes to monitor
+ router = new Router();
+ router.init(routerConf);
+ router.start();
+
+ // Manually trigger the heartbeat and update the values
+ Collection<NamenodeHeartbeatService> heartbeatServices =
+ router.getNamenodeHeartbeatServices();
for (NamenodeHeartbeatService service : heartbeatServices) {
service.periodicInvoke();
}
-
+ MembershipNamenodeResolver resolver =
+ (MembershipNamenodeResolver) router.getNamenodeResolver();
resolver.loadCache(true);
- List<? extends FederationNamenodeContext> namespaceInfo0 =
- resolver.getNamenodesForNameserviceId(ns0);
- List<? extends FederationNamenodeContext> namespaceInfo1 =
- resolver.getNamenodesForNameserviceId(ns1);
- // The modified date won't be updated in ns0.nn0 since it isn't
- // monitored by the Router.
- assertEquals("nn0", namespaceInfo0.get(1).getNamenodeId());
- assertTrue(namespaceInfo0.get(1).getDateModified() < initializedTime);
+ // Check that the monitored values are expected
+ final List<FederationNamenodeContext> namespaceInfo = new ArrayList<>();
+ for (String nsId : nns.keySet()) {
+ List<? extends FederationNamenodeContext> nnReports =
+ resolver.getNamenodesForNameserviceId(nsId);
+ namespaceInfo.addAll(nnReports);
+ }
+ for (FederationNamenodeContext nnInfo : namespaceInfo) {
+ long modTime = nnInfo.getDateModified();
+ long diff = modTime - initializedTime;
+ if ("ns0".equals(nnInfo.getNameserviceId()) &&
+ "nn0".equals(nnInfo.getNamenodeId())) {
+ // The modified date won't be updated in ns0.nn0
+ // since it isn't monitored by the Router.
+ assertTrue(nnInfo + " shouldn't be updated: " + diff,
+ modTime < initializedTime);
+ } else {
+ // other namnodes should be updated as expected
+ assertTrue(nnInfo + " should be updated: " + diff,
+ modTime > initializedTime);
+ }
+ }
+ }
+
+ @Test
+ public void testNamenodeMonitoringConfig() throws Exception {
+ testConfig(asList(), "");
+ testConfig(asList("ns1.nn0"), "ns1.nn0");
+ testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1");
+ testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0, ns1.nn1");
+ testConfig(asList("ns1.nn0", "ns1.nn1"), " ns1.nn0,ns1.nn1");
+ testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1,");
+ }
+
+ /**
+ * Test if configuring a Router to monitor particular Namenodes actually
+ * takes effect.
+ * @param expectedNNs Namenodes that should be monitored.
+ * @param confNsIds Router configuration setting for Namenodes to monitor.
+ */
+ private void testConfig(
+ Collection<String> expectedNNs, String confNsIds) {
+
+ // Setup and start the Router
+ Configuration conf = getNamenodesConfig();
+ Configuration routerConf = new RouterConfigBuilder(conf)
+ .heartbeat(true)
+ .build();
+ routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
+ routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, confNsIds);
+ router = new Router();
+ router.init(routerConf);
- // other namnodes should be updated as expected
- assertEquals("nn1", namespaceInfo0.get(0).getNamenodeId());
- assertTrue(namespaceInfo0.get(0).getDateModified() > initializedTime);
+ // Test the heartbeat services of the Router
+ Collection<NamenodeHeartbeatService> heartbeatServices =
+ router.getNamenodeHeartbeatServices();
+ assertNamenodeHeartbeatService(expectedNNs, heartbeatServices);
+ }
- assertEquals("nn0", namespaceInfo1.get(0).getNamenodeId());
- assertTrue(namespaceInfo1.get(0).getDateModified() > initializedTime);
+ /**
+ * Assert that the namenodes monitored by the Router are the expected.
+ * @param expected Expected namenodes.
+ * @param actual Actual heartbeat services for the Router
+ */
+ private static void assertNamenodeHeartbeatService(
+ Collection<String> expected,
+ Collection<NamenodeHeartbeatService> actual) {
- assertEquals("nn1", namespaceInfo1.get(1).getNamenodeId());
- assertTrue(namespaceInfo1.get(1).getDateModified() > initializedTime);
+ final Set<String> actualSet = new TreeSet<>();
+ for (NamenodeHeartbeatService heartbeatService : actual) {
+ NamenodeStatusReport report = heartbeatService.getNamenodeStatusReport();
+ StringBuilder sb = new StringBuilder();
+ sb.append(report.getNameserviceId());
+ sb.append(".");
+ sb.append(report.getNamenodeId());
+ actualSet.add(sb.toString());
+ }
+ assertTrue(expected + " does not contain all " + actualSet,
+ expected.containsAll(actualSet));
+ assertTrue(actualSet + " does not contain all " + expected,
+ actualSet.containsAll(expected));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org