You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/12/13 18:51:34 UTC

[hadoop] branch trunk updated: HDFS-15016. RBF: getDatanodeReport() should return the latest update. Contributed by Inigo Goiri.

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7fe924b  HDFS-15016. RBF: getDatanodeReport() should return the latest update. Contributed by Inigo Goiri.
7fe924b is described below

commit 7fe924b1c03a2fa45188027bdc0a36cb6c8b4ba4
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Fri Dec 13 10:51:14 2019 -0800

    HDFS-15016. RBF: getDatanodeReport() should return the latest update. Contributed by Inigo Goiri.
---
 .../federation/router/RouterClientProtocol.java    |   7 +-
 .../server/federation/router/RouterRpcServer.java  |   3 +-
 .../hdfs/server/federation/MockNamenode.java       |  41 ++++++++
 .../router/TestRouterNamenodeMonitoring.java       | 109 ++++++++++++++++++++-
 4 files changed, 153 insertions(+), 7 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index 78e5855..b7c2b03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -949,10 +949,13 @@ public class RouterClientProtocol implements ClientProtocol {
       for (DatanodeStorageReport dn : dns) {
         DatanodeInfo dnInfo = dn.getDatanodeInfo();
         String nodeId = dnInfo.getXferAddr();
-        if (!datanodesMap.containsKey(nodeId)) {
+        DatanodeStorageReport oldDn = datanodesMap.get(nodeId);
+        if (oldDn == null ||
+            dnInfo.getLastUpdate() > oldDn.getDatanodeInfo().getLastUpdate()) {
           datanodesMap.put(nodeId, dn);
+        } else {
+          LOG.debug("{} is in multiple subclusters", nodeId);
         }
-        // TODO merge somehow, right now it just takes the first one
       }
     }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index f33d1d3..2ae78c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -886,7 +886,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
       DatanodeInfo[] result = entry.getValue();
       for (DatanodeInfo node : result) {
         String nodeId = node.getXferAddr();
-        if (!datanodesMap.containsKey(nodeId)) {
+        DatanodeInfo dn = datanodesMap.get(nodeId);
+        if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
           // Add the subcluster as a suffix to the network location
           node.setNetworkLocation(
               NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
index 8b5fb54..97428c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
@@ -80,8 +81,11 @@ import org.apache.hadoop.hdfs.server.federation.router.Router;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -119,6 +123,8 @@ public class MockNamenode {
   private String nsId;
   /** HA state of the Namenode. */
   private HAServiceState haState = HAServiceState.STANDBY;
+  /** Datanodes registered in this Namenode. */
+  private List<DatanodeInfo> dns = new ArrayList<>();
 
   /** RPC server of the Namenode that redirects calls to the mock. */
   private Server rpcServer;
@@ -295,6 +301,14 @@ public class MockNamenode {
   }
 
   /**
+   * Get the datanodes that this NN will return.
+   * @return The datanodes that this NN will return.
+   */
+  public List<DatanodeInfo> getDatanodes() {
+    return this.dns;
+  }
+
+  /**
    * Stop the Mock Namenode. It stops all the servers.
    * @throws Exception If it cannot stop the Namenode.
    */
@@ -452,6 +466,33 @@ public class MockNamenode {
     });
   }
 
+  /**
+   * Add datanode related operations.
+   * @throws IOException If it cannot be setup.
+   */
+  public void addDatanodeMock() throws IOException {
+    when(mockNn.getDatanodeReport(any(DatanodeReportType.class))).thenAnswer(
+        invocation -> {
+          LOG.info("{} getDatanodeReport()", nsId, invocation.getArgument(0));
+          return dns.toArray();
+        });
+    when(mockNn.getDatanodeStorageReport(any(DatanodeReportType.class)))
+        .thenAnswer(invocation -> {
+          LOG.info("{} getDatanodeStorageReport()",
+              nsId, invocation.getArgument(0));
+          DatanodeStorageReport[] ret = new DatanodeStorageReport[dns.size()];
+          for (int i = 0; i < dns.size(); i++) {
+            DatanodeInfo dn = dns.get(i);
+            DatanodeStorage storage = new DatanodeStorage(dn.getName());
+            StorageReport[] storageReports = new StorageReport[] {
+                new StorageReport(storage, false, 0L, 0L, 0L, 0L, 0L)
+            };
+            ret[i] = new DatanodeStorageReport(dn, storageReports);
+          }
+          return ret;
+        });
+  }
+
   private static String getSrc(InvocationOnMock invocation) {
     return (String) invocation.getArguments()[0];
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
index 9fcfcb4..b5f5b67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
@@ -18,10 +18,14 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import static java.util.Arrays.asList;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileSystem;
+import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters;
 import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -31,9 +35,15 @@ import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.LogVerificationAppender;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.federation.MockNamenode;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
@@ -42,6 +52,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
@@ -67,6 +78,8 @@ public class TestRouterNamenodeMonitoring {
   private Map<String, Map<String, MockNamenode>> nns = new HashMap<>();
   /** Nameservices in the federated cluster. */
   private List<String> nsIds = asList("ns0", "ns1");
+  /** Namenodes in the cluster. */
+  private List<String> nnIds = asList("nn0", "nn1");
 
   /** Time the test starts. */
   private long initializedTime;
@@ -77,7 +90,7 @@ public class TestRouterNamenodeMonitoring {
     LOG.info("Initialize the Mock Namenodes to monitor");
     for (String nsId : nsIds) {
       nns.put(nsId, new HashMap<>());
-      for (String nnId : asList("nn0", "nn1")) {
+      for (String nnId : nnIds) {
         nns.get(nsId).put(nnId, new MockNamenode(nsId));
       }
     }
@@ -115,14 +128,14 @@ public class TestRouterNamenodeMonitoring {
     conf.set(DFSConfigKeys.DFS_NAMESERVICES,
         StringUtils.join(",", nns.keySet()));
     for (String nsId : nns.keySet()) {
-      Set<String> nnIds = nns.get(nsId).keySet();
+      Set<String> nsNnIds = nns.get(nsId).keySet();
 
       StringBuilder sb = new StringBuilder();
       sb.append(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX);
       sb.append(".").append(nsId);
-      conf.set(sb.toString(), StringUtils.join(",", nnIds));
+      conf.set(sb.toString(), StringUtils.join(",", nsNnIds));
 
-      for (String nnId : nnIds) {
+      for (String nnId : nsNnIds) {
         final MockNamenode nn = nns.get(nsId).get(nnId);
 
         sb = new StringBuilder();
@@ -314,4 +327,92 @@ public class TestRouterNamenodeMonitoring {
       assertEquals(0, appender.countLinesWithMessage("JMX URL: https://"));
     }
   }
+
+  /**
+   * Test the view of the Datanodes that the Router sees. If a Datanode is
+   * registered in two subclusters, it should return the most up to date
+   * information.
+   * @throws IOException If the test cannot run.
+   */
+  @Test
+  public void testDatanodesView() throws IOException {
+
+    // Setup the router
+    Configuration routerConf = new RouterConfigBuilder()
+        .stateStore()
+        .rpc()
+        .build();
+    router = new Router();
+    router.init(routerConf);
+    router.start();
+
+    // Setup the namenodes
+    for (String nsId : nsIds) {
+      registerSubclusters(router, nns.get(nsId).values());
+      for (String nnId : nnIds) {
+        MockNamenode nn = nns.get(nsId).get(nnId);
+        if ("nn0".equals(nnId)) {
+          nn.transitionToActive();
+        }
+        nn.addDatanodeMock();
+      }
+    }
+
+    // Set different states for the DNs in each namespace
+    long time = Time.now();
+    for (String nsId : nsIds) {
+      for (String nnId : nnIds) {
+        // dn0 is DECOMMISSIONED in the most recent (ns1)
+        DatanodeInfoBuilder dn0Builder = new DatanodeInfoBuilder()
+            .setDatanodeUuid("dn0")
+            .setHostName("dn0")
+            .setIpAddr("dn0")
+            .setXferPort(10000);
+        if ("ns0".equals(nsId)) {
+          dn0Builder.setLastUpdate(time - 1000);
+          dn0Builder.setAdminState(AdminStates.NORMAL);
+        } else if ("ns1".equals(nsId)) {
+          dn0Builder.setLastUpdate(time - 500);
+          dn0Builder.setAdminState(AdminStates.DECOMMISSIONED);
+        }
+
+        // dn1 is NORMAL in the most recent (ns0)
+        DatanodeInfoBuilder dn1Builder = new DatanodeInfoBuilder()
+            .setDatanodeUuid("dn1")
+            .setHostName("dn1")
+            .setIpAddr("dn1")
+            .setXferPort(10000);
+        if ("ns0".equals(nsId)) {
+          dn1Builder.setLastUpdate(time - 1000);
+          dn1Builder.setAdminState(AdminStates.NORMAL);
+        } else if ("ns1".equals(nsId)) {
+          dn1Builder.setLastUpdate(time - 5 * 1000);
+          dn1Builder.setAdminState(AdminStates.DECOMMISSION_INPROGRESS);
+        }
+
+        // Update the mock NameNode with the DN views
+        MockNamenode nn = nns.get(nsId).get(nnId);
+        List<DatanodeInfo> dns = nn.getDatanodes();
+        dns.add(dn0Builder.build());
+        dns.add(dn1Builder.build());
+      }
+    }
+
+    // Get the datanodes from the Router and check we get the right view
+    DistributedFileSystem dfs = (DistributedFileSystem)getFileSystem(router);
+    DFSClient dfsClient = dfs.getClient();
+    DatanodeStorageReport[] dns = dfsClient.getDatanodeStorageReport(
+        DatanodeReportType.ALL);
+    assertEquals(2, dns.length);
+    for (DatanodeStorageReport dn : dns) {
+      DatanodeInfo dnInfo = dn.getDatanodeInfo();
+      if ("dn0".equals(dnInfo.getHostName())) {
+        assertEquals(AdminStates.DECOMMISSIONED, dnInfo.getAdminState());
+      } else if ("dn1".equals(dnInfo.getHostName())) {
+        assertEquals(AdminStates.NORMAL, dnInfo.getAdminState());
+      } else {
+        fail("Unexpected DN: " + dnInfo.getHostName());
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org