You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2020/05/18 14:10:55 UTC

[hadoop] branch branch-3.3 updated: HDFS-13183. Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He.

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new acae31a  HDFS-13183. Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He.
acae31a is described below

commit acae31aa2824bf50fe9291148357b4f5c76a9329
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Mon May 18 07:08:32 2020 -0700

    HDFS-13183. Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He.
    
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
    (cherry picked from commit a3f44dacc1fa19acc4eefd1e2505e54f8629e603)
---
 .../hadoop/hdfs/server/balancer/Balancer.java      | 39 ++++++----
 .../hdfs/server/balancer/NameNodeConnector.java    | 85 ++++++++++++++++++++++
 .../balancer/TestBalancerWithHANameNodes.java      | 56 +++++++++++++-
 3 files changed, 163 insertions(+), 17 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index e8b4971..f643590 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -37,6 +37,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -688,7 +689,7 @@ public class Balancer {
    * execute a {@link Balancer} to work through all datanodes once.  
    */
   static private int doBalance(Collection<URI> namenodes,
-      final BalancerParameters p, Configuration conf)
+      Collection<String> nsIds, final BalancerParameters p, Configuration conf)
       throws IOException, InterruptedException {
     final long sleeptime =
         conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
@@ -707,13 +708,12 @@ public class Balancer {
     System.out.println("Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved");
     
     List<NameNodeConnector> connectors = Collections.emptyList();
-    try {
-      connectors = NameNodeConnector.newNameNodeConnectors(namenodes, 
-              Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
-              p.getMaxIdleIteration());
-
-      boolean done = false;
-      for(int iteration = 0; !done; iteration++) {
+    boolean done = false;
+    for(int iteration = 0; !done; iteration++) {
+      try {
+        connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds,
+            Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
+            p.getMaxIdleIteration());
         done = true;
         Collections.shuffle(connectors);
         for(NameNodeConnector nnc : connectors) {
@@ -741,19 +741,25 @@ public class Balancer {
         if (!done) {
           Thread.sleep(sleeptime);
         }
-      }
-    } finally {
-      for(NameNodeConnector nnc : connectors) {
-        IOUtils.cleanupWithLogger(LOG, nnc);
+      } finally {
+        for(NameNodeConnector nnc : connectors) {
+          IOUtils.cleanupWithLogger(LOG, nnc);
+        }
       }
     }
     return ExitStatus.SUCCESS.getExitCode();
   }
 
   static int run(Collection<URI> namenodes, final BalancerParameters p,
-      Configuration conf) throws IOException, InterruptedException {
+                 Configuration conf) throws IOException, InterruptedException {
+    return run(namenodes, null, p, conf);
+  }
+
+  static int run(Collection<URI> namenodes, Collection<String> nsIds,
+      final BalancerParameters p, Configuration conf)
+      throws IOException, InterruptedException {
     if (!p.getRunAsService()) {
-      return doBalance(namenodes, p, conf);
+      return doBalance(namenodes, nsIds, p, conf);
     }
     if (!serviceRunning) {
       serviceRunning = true;
@@ -772,7 +778,7 @@ public class Balancer {
 
     while (serviceRunning) {
       try {
-        int retCode = doBalance(namenodes, p, conf);
+        int retCode = doBalance(namenodes, nsIds, p, conf);
         if (retCode < 0) {
           LOG.info("Balance failed, error code: " + retCode);
           failedTimesSinceLastSuccessfulBalance++;
@@ -856,7 +862,8 @@ public class Balancer {
         checkReplicationPolicyCompatibility(conf);
 
         final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
-        return Balancer.run(namenodes, parse(args), conf);
+        final Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
+        return Balancer.run(namenodes, nsIds, parse(args), conf);
       } catch (IOException e) {
         System.out.println(e + ".  Exiting ...");
         return ExitStatus.IO_EXCEPTION.getExitCode();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 2844ad5..8403f82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -25,6 +25,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,7 +33,12 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.RateLimiter;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -100,6 +106,32 @@ public class NameNodeConnector implements Closeable {
     return connectors;
   }
 
+  public static List<NameNodeConnector> newNameNodeConnectors(
+      Collection<URI> namenodes, Collection<String> nsIds, String name,
+      Path idPath, Configuration conf, int maxIdleIterations)
+      throws IOException {
+    final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(
+        namenodes.size());
+    Map<URI, String> uriToNsId = new HashMap<>();
+    if (nsIds != null) {
+      for (URI uri : namenodes) {
+        for (String nsId : nsIds) {
+          if (uri.getAuthority().equals(nsId)) {
+            uriToNsId.put(uri, nsId);
+          }
+        }
+      }
+    }
+    for (URI uri : namenodes) {
+      String nsId = uriToNsId.get(uri);
+      NameNodeConnector nnc = new NameNodeConnector(name, uri, nsId, idPath,
+          null, conf, maxIdleIterations);
+      nnc.getKeyManager().startBlockKeyUpdater();
+      connectors.add(nnc);
+    }
+    return connectors;
+  }
+
   @VisibleForTesting
   public static void setWrite2IdFile(boolean write2IdFile) {
     NameNodeConnector.write2IdFile = write2IdFile;
@@ -114,6 +146,13 @@ public class NameNodeConnector implements Closeable {
   private final String blockpoolID;
 
   private final BalancerProtocols namenode;
+  /**
+   * If set balancerShouldRequestStandby true, Balancer will getBlocks from
+   * Standby NameNode only and it can reduce the performance impact of Active
+   * NameNode, especially in a busy HA mode cluster.
+   */
+  private boolean balancerShouldRequestStandby;
+  private NamenodeProtocol standbyNameNode;
   private final KeyManager keyManager;
   final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
 
@@ -149,6 +188,11 @@ public class NameNodeConnector implements Closeable {
 
     this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
         BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
+    this.balancerShouldRequestStandby = conf.getBoolean(
+        DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY,
+        DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT);
+    this.standbyNameNode = null;
+
     this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
 
     final NamespaceInfo namespaceinfo = namenode.versionRequest();
@@ -167,6 +211,31 @@ public class NameNodeConnector implements Closeable {
     }
   }
 
+  public NameNodeConnector(String name, URI nameNodeUri, String nsId,
+                           Path idPath, List<Path> targetPaths,
+                           Configuration conf, int maxNotChangedIterations)
+      throws IOException {
+    this(name, nameNodeUri, idPath, targetPaths, conf, maxNotChangedIterations);
+    if (nsId != null && HAUtil.isHAEnabled(conf, nsId)) {
+      List<ClientProtocol> namenodes =
+          HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId);
+      for (ClientProtocol proxy : namenodes) {
+        try {
+          if (proxy.getHAServiceState().equals(
+              HAServiceProtocol.HAServiceState.STANDBY)) {
+            this.standbyNameNode = NameNodeProxies.createNonHAProxy(
+                conf, RPC.getServerAddress(proxy), NamenodeProtocol.class,
+                UserGroupInformation.getCurrentUser(), false).getProxy();
+            break;
+          }
+        } catch (Exception e) {
+          //Ignore the exception while connecting to a namenode.
+          LOG.debug("Error while connecting to namenode", e);
+        }
+      }
+    }
+  }
+
   public DistributedFileSystem getDistributedFileSystem() {
     return fs;
   }
@@ -186,6 +255,22 @@ public class NameNodeConnector implements Closeable {
     if (getBlocksRateLimiter != null) {
       getBlocksRateLimiter.acquire();
     }
+    boolean isRequestStandby = true;
+    try {
+      if (balancerShouldRequestStandby && standbyNameNode != null) {
+        return standbyNameNode.getBlocks(datanode, size, minBlockSize);
+      } else {
+        isRequestStandby = false;
+      }
+    } catch (Exception e) {
+      LOG.warn("Request #getBlocks to Standby NameNode but meet exception, " +
+          "will fallback to normal way", e);
+      isRequestStandby = false;
+    } finally {
+      if (isRequestStandby) {
+        LOG.info("Request #getBlocks to Standby NameNode success.");
+      }
+    }
     return namenode.getBlocks(datanode, size, minBlockSize);
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
index c604315..185df12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
@@ -17,7 +17,11 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.times;
@@ -31,6 +35,7 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -44,7 +49,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.junit.Test;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test balancer with HA NameNodes
@@ -106,6 +113,12 @@ public class TestBalancerWithHANameNodes {
     TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
         / numOfDatanodes, (short) numOfDatanodes, 0);
 
+    boolean isRequestStandby = conf.getBoolean(
+        DFS_HA_ALLOW_STALE_READ_KEY, DFS_HA_ALLOW_STALE_READ_DEFAULT);
+    if (isRequestStandby) {
+      HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
+          cluster.getNameNode(1));
+    }
     // start up an empty node with the same capacity and on the same rack
     long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
     String newNodeRack = TestBalancer.RACK2; // new node's rack
@@ -115,14 +128,55 @@ public class TestBalancerWithHANameNodes {
     TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
         cluster);
     Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+    Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
     assertEquals(1, namenodes.size());
-    final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
+    final int r = Balancer.run(namenodes, nsIds, BalancerParameters.DEFAULT,
+        conf);
     assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
     TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
         cluster, BalancerParameters.DEFAULT);
   }
 
   /**
+   * Test Balancer request Standby NameNode when enable this feature.
+   */
+  @Test(timeout = 60000)
+  public void testBalancerRequestSBNWithHA() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFS_HA_ALLOW_STALE_READ_KEY, true);
+    conf.setLong(DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    //conf.setBoolean(DFS_HA_BALANCER_REQUEST_STANDBY_KEY, true);
+    TestBalancer.initConf(conf);
+    assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length);
+    NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
+    nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
+    Configuration copiedConf = new Configuration(conf);
+    cluster = new MiniDFSCluster.Builder(copiedConf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(TEST_CAPACITIES.length)
+        .racks(TEST_RACKS)
+        .simulatedCapacities(TEST_CAPACITIES)
+        .build();
+    // Try capture NameNodeConnector log.
+    LogCapturer log =LogCapturer.captureLogs(
+        LoggerFactory.getLogger(NameNodeConnector.class));
+    HATestUtil.setFailoverConfigurations(cluster, conf);
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+      Thread.sleep(500);
+      client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
+          ClientProtocol.class).getProxy();
+      doTest(conf);
+      // Check getBlocks request to Standby NameNode.
+      assertTrue(log.getOutput().contains(
+          "Request #getBlocks to Standby NameNode success."));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
    * Test Balancer with ObserverNodes.
    */
   @Test(timeout = 120000)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org