You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2019/03/20 06:09:32 UTC

[hadoop] branch HDFS-13891 updated: HDFS-14351. RBF: Optimize configuration item resolving for monitor namenode. Contributed by He Xiaoqiao and Inigo Goiri.

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/HDFS-13891 by this push:
     new 0c686b7  HDFS-14351. RBF: Optimize configuration item resolving for monitor namenode. Contributed by He Xiaoqiao and Inigo Goiri.
0c686b7 is described below

commit 0c686b72068d0546e7461057dc2b95258fea7e23
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Wed Mar 20 11:12:49 2019 +0530

    HDFS-14351. RBF: Optimize configuration item resolving for monitor namenode. Contributed by He Xiaoqiao and Inigo Goiri.
---
 .../hdfs/server/federation/router/Router.java      |  38 ++-
 .../hdfs/server/federation/MockNamenode.java       | 225 +++++++++++++++++
 .../router/TestRouterNamenodeMonitoring.java       | 278 +++++++++++++++------
 3 files changed, 444 insertions(+), 97 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index 7d112f9..9e18ebf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -497,27 +497,25 @@ public class Router extends CompositeService {
     }
 
     // Create heartbeat services for a list specified by the admin
-    String namenodes = this.conf.get(
+    Collection<String> namenodes = this.conf.getTrimmedStringCollection(
         RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE);
-    if (namenodes != null) {
-      for (String namenode : namenodes.split(",")) {
-        String[] namenodeSplit = namenode.split("\\.");
-        String nsId = null;
-        String nnId = null;
-        if (namenodeSplit.length == 2) {
-          nsId = namenodeSplit[0];
-          nnId = namenodeSplit[1];
-        } else if (namenodeSplit.length == 1) {
-          nsId = namenode;
-        } else {
-          LOG.error("Wrong Namenode to monitor: {}", namenode);
-        }
-        if (nsId != null) {
-          NamenodeHeartbeatService heartbeatService =
-              createNamenodeHeartbeatService(nsId, nnId);
-          if (heartbeatService != null) {
-            ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
-          }
+    for (String namenode : namenodes) {
+      String[] namenodeSplit = namenode.split("\\.");
+      String nsId = null;
+      String nnId = null;
+      if (namenodeSplit.length == 2) {
+        nsId = namenodeSplit[0];
+        nnId = namenodeSplit[1];
+      } else if (namenodeSplit.length == 1) {
+        nsId = namenode;
+      } else {
+        LOG.error("Wrong Namenode to monitor: {}", namenode);
+      }
+      if (nsId != null) {
+        NamenodeHeartbeatService heartbeatService =
+            createNamenodeHeartbeatService(nsId, nnId);
+        if (heartbeatService != null) {
+          ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
         }
       }
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
new file mode 100644
index 0000000..9b58fff
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HAServiceStatus;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.protobuf.BlockingService;
+
+
+/**
+ * Mock for the network interfaces (e.g., RPC and HTTP) of a Namenode. This is
+ * used by the Routers in a mock cluster.
+ */
+public class MockNamenode {
+
+  /** Mock implementation of the Namenode. */
+  private final NamenodeProtocols mockNn;
+
+  /** HA state of the Namenode. */
+  private HAServiceState haState = HAServiceState.STANDBY;
+
+  /** RPC server of the Namenode that redirects calls to the mock. */
+  private Server rpcServer;
+  /** HTTP server of the Namenode that redirects calls to the mock. */
+  private HttpServer2 httpServer;
+
+
+  public MockNamenode() throws Exception {
+    Configuration conf = new Configuration();
+
+    this.mockNn = mock(NamenodeProtocols.class);
+    setupMock();
+    setupRPCServer(conf);
+    setupHTTPServer(conf);
+  }
+
+  /**
+   * Setup the mock of the Namenode. It offers the basic functionality for
+   * Routers to get the status.
+   * @throws IOException If the mock cannot be setup.
+   */
+  protected void setupMock() throws IOException {
+    NamespaceInfo nsInfo = new NamespaceInfo(1, "clusterId", "bpId", 1);
+    when(mockNn.versionRequest()).thenReturn(nsInfo);
+
+    when(mockNn.getServiceStatus()).thenAnswer(new Answer<HAServiceStatus>() {
+      @Override
+      public HAServiceStatus answer(InvocationOnMock invocation)
+          throws Throwable {
+        HAServiceStatus haStatus = new HAServiceStatus(getHAServiceState());
+        haStatus.setNotReadyToBecomeActive("");
+        return haStatus;
+      }
+    });
+  }
+
+  /**
+   * Setup the RPC server of the Namenode that redirects calls to the mock.
+   * @param conf Configuration of the server.
+   * @throws IOException If the RPC server cannot be setup.
+   */
+  private void setupRPCServer(final Configuration conf) throws IOException {
+    RPC.setProtocolEngine(
+        conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
+    ClientNamenodeProtocolServerSideTranslatorPB
+        clientNNProtoXlator =
+            new ClientNamenodeProtocolServerSideTranslatorPB(mockNn);
+    BlockingService clientNNPbService =
+        ClientNamenodeProtocol.newReflectiveBlockingService(
+            clientNNProtoXlator);
+
+    rpcServer = new RPC.Builder(conf)
+        .setProtocol(ClientNamenodeProtocolPB.class)
+        .setInstance(clientNNPbService)
+        .setBindAddress("0.0.0.0")
+        .setPort(0)
+        .build();
+
+    NamenodeProtocolServerSideTranslatorPB nnProtoXlator =
+        new NamenodeProtocolServerSideTranslatorPB(mockNn);
+    BlockingService nnProtoPbService =
+        NamenodeProtocolService.newReflectiveBlockingService(
+            nnProtoXlator);
+    DFSUtil.addPBProtocol(
+        conf, NamenodeProtocolPB.class, nnProtoPbService, rpcServer);
+
+    DatanodeProtocolServerSideTranslatorPB dnProtoPbXlator =
+        new DatanodeProtocolServerSideTranslatorPB(mockNn, 1000);
+    BlockingService dnProtoPbService =
+        DatanodeProtocolService.newReflectiveBlockingService(
+            dnProtoPbXlator);
+    DFSUtil.addPBProtocol(
+        conf, DatanodeProtocolPB.class, dnProtoPbService, rpcServer);
+
+    HAServiceProtocolServerSideTranslatorPB haServiceProtoXlator =
+        new HAServiceProtocolServerSideTranslatorPB(mockNn);
+    BlockingService haProtoPbService =
+        HAServiceProtocolService.newReflectiveBlockingService(
+            haServiceProtoXlator);
+    DFSUtil.addPBProtocol(
+        conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer);
+
+    rpcServer.start();
+  }
+
+  /**
+   * Setup the HTTP server of the Namenode that redirects calls to the mock.
+   * @param conf Configuration of the server.
+   * @throws IOException If the HTTP server cannot be setup.
+   */
+  private void setupHTTPServer(Configuration conf) throws IOException {
+    HttpServer2.Builder builder = new HttpServer2.Builder()
+        .setName("hdfs")
+        .setConf(conf)
+        .setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
+        .addEndpoint(URI.create("http://0.0.0.0:0"));
+    httpServer = builder.build();
+    httpServer.start();
+  }
+
+  /**
+   * Get the RPC port for the Mock Namenode.
+   * @return RPC port.
+   */
+  public int getRPCPort() {
+    return rpcServer.getListenerAddress().getPort();
+  }
+
+  /**
+   * Get the HTTP port for the Mock Namenode.
+   * @return HTTP port.
+   */
+  public int getHTTPPort() {
+    return httpServer.getConnectorAddress(0).getPort();
+  }
+
+  /**
+   * Get the Mock core. This is used to extend the mock.
+   * @return Mock Namenode protocol to be extended.
+   */
+  public NamenodeProtocols getMock() {
+    return mockNn;
+  }
+
+  /**
+   * Get the HA state of the Mock Namenode.
+   * @return HA state (ACTIVE or STANDBY).
+   */
+  public HAServiceState getHAServiceState() {
+    return haState;
+  }
+
+  /**
+   * Show the Mock Namenode as Active.
+   */
+  public void transitionToActive() {
+    this.haState = HAServiceState.ACTIVE;
+  }
+
+  /**
+   * Show the Mock Namenode as Standby.
+   */
+  public void transitionToStandby() {
+    this.haState = HAServiceState.STANDBY;
+  }
+
+  /**
+   * Stop the Mock Namenode. It stops all the servers.
+   * @throws Exception If it cannot stop the Namenode.
+   */
+  public void stop() throws Exception {
+    if (rpcServer != null) {
+      rpcServer.stop();
+    }
+    if (httpServer != null) {
+      httpServer.stop();
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
index 0bea11c..1224fa2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
@@ -17,127 +17,251 @@
  */
 package org.apache.hadoop.hdfs.server.federation.router;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
-import static org.junit.Assert.assertEquals;
+import static java.util.Arrays.asList;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.MockNamenode;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
-import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
-import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
-import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test namenodes monitor behavior in the Router.
  */
 public class TestRouterNamenodeMonitoring {
 
-  private static StateStoreDFSCluster cluster;
-  private static RouterContext routerContext;
-  private static MembershipNamenodeResolver resolver;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRouterNamenodeMonitoring.class);
 
-  private String ns0;
-  private String ns1;
-  private long initializedTime;
 
-  @Before
-  public void setUp() throws Exception {
-    // Build and start a federated cluster with HA enabled
-    cluster = new StateStoreDFSCluster(true, 2);
-    // Enable heartbeat service and local heartbeat
-    Configuration routerConf = new RouterConfigBuilder()
-        .stateStore()
-        .admin()
-        .rpc()
-        .enableLocalHeartbeat(true)
-        .heartbeat()
-        .build();
+  /** Router for the test. */
+  private Router router;
+  /** Namenodes in the cluster. */
+  private Map<String, Map<String, MockNamenode>> nns = new HashMap<>();
+  /** Nameservices in the federated cluster. */
+  private List<String> nsIds = asList("ns0", "ns1");
 
-    // Specify local node (ns0.nn1) to monitor
-    StringBuilder sb = new StringBuilder();
-    ns0 = cluster.getNameservices().get(0);
-    NamenodeContext context = cluster.getNamenodes(ns0).get(1);
-    routerConf.set(DFS_NAMESERVICE_ID, ns0);
-    routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId());
+  /** Time the test starts. */
+  private long initializedTime;
 
-    // Specify namenodes (ns1.nn0,ns1.nn1) to monitor
-    sb = new StringBuilder();
-    ns1 = cluster.getNameservices().get(1);
-    for (NamenodeContext ctx : cluster.getNamenodes(ns1)) {
-      String suffix = ctx.getConfSuffix();
-      if (sb.length() != 0) {
-        sb.append(",");
+
+  @Before
+  public void setup() throws Exception {
+    LOG.info("Initialize the Mock Namenodes to monitor");
+    for (String nsId : nsIds) {
+      nns.put(nsId, new HashMap<>());
+      for (String nnId : asList("nn0", "nn1")) {
+        nns.get(nsId).put(nnId, new MockNamenode());
       }
-      sb.append(suffix);
     }
-    // override with the namenodes: ns1.nn0,ns1.nn1
-    routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
 
-    cluster.addRouterOverrides(routerConf);
-    cluster.startCluster();
-    cluster.startRouters();
-    cluster.waitClusterUp();
+    LOG.info("Set nn0 to active for all nameservices");
+    for (Map<String, MockNamenode> nnNS : nns.values()) {
+      nnNS.get("nn0").transitionToActive();
+      nnNS.get("nn1").transitionToStandby();
+    }
 
-    routerContext = cluster.getRandomRouter();
-    resolver = (MembershipNamenodeResolver) routerContext.getRouter()
-        .getNamenodeResolver();
     initializedTime = Time.now();
   }
 
   @After
-  public void tearDown() {
-    if (cluster != null) {
-      cluster.stopRouter(routerContext);
-      cluster.shutdown();
-      cluster = null;
+  public void cleanup() throws Exception {
+    for (Map<String, MockNamenode> nnNS : nns.values()) {
+      for (MockNamenode nn : nnNS.values()) {
+        nn.stop();
+      }
+    }
+    nns.clear();
+
+    if (router != null) {
+      router.stop();
+    }
+  }
+
+  /**
+   * Get the configuration of the cluster which contains all the Namenodes and
+   * their addresses.
+   * @return Configuration containing all the Namenodes.
+   */
+  private Configuration getNamenodesConfig() {
+    final Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_NAMESERVICES,
+        StringUtils.join(",", nns.keySet()));
+    for (String nsId : nns.keySet()) {
+      Set<String> nnIds = nns.get(nsId).keySet();
+
+      StringBuilder sb = new StringBuilder();
+      sb.append(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX);
+      sb.append(".").append(nsId);
+      conf.set(sb.toString(), StringUtils.join(",", nnIds));
+
+      for (String nnId : nnIds) {
+        final MockNamenode nn = nns.get(nsId).get(nnId);
+
+        sb = new StringBuilder();
+        sb.append(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
+        sb.append(".").append(nsId);
+        sb.append(".").append(nnId);
+        conf.set(sb.toString(), "localhost:" + nn.getRPCPort());
+
+        sb = new StringBuilder();
+        sb.append(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+        sb.append(".").append(nsId);
+        sb.append(".").append(nnId);
+        conf.set(sb.toString(), "localhost:" + nn.getHTTPPort());
+      }
     }
+    return conf;
   }
 
   @Test
   public void testNamenodeMonitoring() throws Exception {
-    // Set nn0 to active for all nameservices
-    for (String ns : cluster.getNameservices()) {
-      cluster.switchToActive(ns, "nn0");
-      cluster.switchToStandby(ns, "nn1");
-    }
+    Configuration nsConf = getNamenodesConfig();
 
-    Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
-        .getRouter().getNamenodeHeartbeatServices();
-    // manually trigger the heartbeat
+    // Setup the State Store for the Router to use
+    Configuration stateStoreConfig = getStateStoreConfiguration();
+    stateStoreConfig.setClass(
+        RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+        MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
+    stateStoreConfig.setClass(
+        RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+        MountTableResolver.class, FileSubclusterResolver.class);
+
+    Configuration routerConf = new RouterConfigBuilder(nsConf)
+        .enableLocalHeartbeat(true)
+        .heartbeat()
+        .stateStore()
+        .rpc()
+        .build();
+
+    // Specify namenodes (ns1.nn0,ns1.nn1) to monitor
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE,
+        "ns1.nn0,ns1.nn1");
+    routerConf.addResource(stateStoreConfig);
+
+    // Specify local node (ns0.nn1) to monitor
+    routerConf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, "ns0");
+    routerConf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
+
+    // Start the Router with the namenodes to monitor
+    router = new Router();
+    router.init(routerConf);
+    router.start();
+
+    // Manually trigger the heartbeat and update the values
+    Collection<NamenodeHeartbeatService> heartbeatServices =
+        router.getNamenodeHeartbeatServices();
     for (NamenodeHeartbeatService service : heartbeatServices) {
       service.periodicInvoke();
     }
-
+    MembershipNamenodeResolver resolver =
+        (MembershipNamenodeResolver) router.getNamenodeResolver();
     resolver.loadCache(true);
-    List<? extends FederationNamenodeContext> namespaceInfo0 =
-        resolver.getNamenodesForNameserviceId(ns0);
-    List<? extends FederationNamenodeContext> namespaceInfo1 =
-        resolver.getNamenodesForNameserviceId(ns1);
 
-    // The modified date won't be updated in ns0.nn0 since it isn't
-    // monitored by the Router.
-    assertEquals("nn0", namespaceInfo0.get(1).getNamenodeId());
-    assertTrue(namespaceInfo0.get(1).getDateModified() < initializedTime);
+    // Check that the monitored values are expected
+    final List<FederationNamenodeContext> namespaceInfo = new ArrayList<>();
+    for (String nsId : nns.keySet()) {
+      List<? extends FederationNamenodeContext> nnReports =
+          resolver.getNamenodesForNameserviceId(nsId);
+      namespaceInfo.addAll(nnReports);
+    }
+    for (FederationNamenodeContext nnInfo : namespaceInfo) {
+      long modTime = nnInfo.getDateModified();
+      long diff = modTime - initializedTime;
+      if ("ns0".equals(nnInfo.getNameserviceId()) &&
+          "nn0".equals(nnInfo.getNamenodeId())) {
+        // The modified date won't be updated in ns0.nn0
+        // since it isn't monitored by the Router.
+        assertTrue(nnInfo + " shouldn't be updated: " + diff,
+            modTime < initializedTime);
+      } else {
+        // other namnodes should be updated as expected
+        assertTrue(nnInfo + " should be updated: " + diff,
+            modTime > initializedTime);
+      }
+    }
+  }
+
+  @Test
+  public void testNamenodeMonitoringConfig() throws Exception {
+    testConfig(asList(), "");
+    testConfig(asList("ns1.nn0"), "ns1.nn0");
+    testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1");
+    testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0, ns1.nn1");
+    testConfig(asList("ns1.nn0", "ns1.nn1"), " ns1.nn0,ns1.nn1");
+    testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1,");
+  }
+
+  /**
+   * Test if configuring a Router to monitor particular Namenodes actually
+   * takes effect.
+   * @param expectedNNs Namenodes that should be monitored.
+   * @param confNsIds Router configuration setting for Namenodes to monitor.
+   */
+  private void testConfig(
+      Collection<String> expectedNNs, String confNsIds) {
+
+    // Setup and start the Router
+    Configuration conf = getNamenodesConfig();
+    Configuration routerConf = new RouterConfigBuilder(conf)
+        .heartbeat(true)
+        .build();
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, confNsIds);
+    router = new Router();
+    router.init(routerConf);
 
-    // other namnodes should be updated as expected
-    assertEquals("nn1", namespaceInfo0.get(0).getNamenodeId());
-    assertTrue(namespaceInfo0.get(0).getDateModified() > initializedTime);
+    // Test the heartbeat services of the Router
+    Collection<NamenodeHeartbeatService> heartbeatServices =
+        router.getNamenodeHeartbeatServices();
+    assertNamenodeHeartbeatService(expectedNNs, heartbeatServices);
+  }
 
-    assertEquals("nn0", namespaceInfo1.get(0).getNamenodeId());
-    assertTrue(namespaceInfo1.get(0).getDateModified() > initializedTime);
+  /**
+   * Assert that the namenodes monitored by the Router are the expected.
+   * @param expected Expected namenodes.
+   * @param actual Actual heartbeat services for the Router
+   */
+  private static void assertNamenodeHeartbeatService(
+      Collection<String> expected,
+      Collection<NamenodeHeartbeatService> actual) {
 
-    assertEquals("nn1", namespaceInfo1.get(1).getNamenodeId());
-    assertTrue(namespaceInfo1.get(1).getDateModified() > initializedTime);
+    final Set<String> actualSet = new TreeSet<>();
+    for (NamenodeHeartbeatService heartbeatService : actual) {
+      NamenodeStatusReport report = heartbeatService.getNamenodeStatusReport();
+      StringBuilder sb = new StringBuilder();
+      sb.append(report.getNameserviceId());
+      sb.append(".");
+      sb.append(report.getNamenodeId());
+      actualSet.add(sb.toString());
+    }
+    assertTrue(expected + " does not contain all " + actualSet,
+        expected.containsAll(actualSet));
+    assertTrue(actualSet + " does not contain all " + expected,
+        actualSet.containsAll(expected));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org