You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2020/05/18 16:40:16 UTC

[hadoop] branch branch-3.3 updated: Revert "HDFS-13183. Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He."

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 53d22fd  Revert "HDFS-13183. Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He."
53d22fd is described below

commit 53d22fdb88c60f43d8674348529d18917fcf6e39
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Mon May 18 09:39:57 2020 -0700

    Revert "HDFS-13183. Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He."
    
    This reverts commit acae31aa2824bf50fe9291148357b4f5c76a9329.
---
 .../hadoop/hdfs/server/balancer/Balancer.java      | 39 ++++------
 .../hdfs/server/balancer/NameNodeConnector.java    | 85 ----------------------
 .../balancer/TestBalancerWithHANameNodes.java      | 56 +-------------
 3 files changed, 17 insertions(+), 163 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index f643590..e8b4971 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -37,7 +37,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -689,7 +688,7 @@ public class Balancer {
    * execute a {@link Balancer} to work through all datanodes once.  
    */
   static private int doBalance(Collection<URI> namenodes,
-      Collection<String> nsIds, final BalancerParameters p, Configuration conf)
+      final BalancerParameters p, Configuration conf)
       throws IOException, InterruptedException {
     final long sleeptime =
         conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
@@ -708,12 +707,13 @@ public class Balancer {
     System.out.println("Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved");
     
     List<NameNodeConnector> connectors = Collections.emptyList();
-    boolean done = false;
-    for(int iteration = 0; !done; iteration++) {
-      try {
-        connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds,
-            Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
-            p.getMaxIdleIteration());
+    try {
+      connectors = NameNodeConnector.newNameNodeConnectors(namenodes, 
+              Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
+              p.getMaxIdleIteration());
+
+      boolean done = false;
+      for(int iteration = 0; !done; iteration++) {
         done = true;
         Collections.shuffle(connectors);
         for(NameNodeConnector nnc : connectors) {
@@ -741,25 +741,19 @@ public class Balancer {
         if (!done) {
           Thread.sleep(sleeptime);
         }
-      } finally {
-        for(NameNodeConnector nnc : connectors) {
-          IOUtils.cleanupWithLogger(LOG, nnc);
-        }
+      }
+    } finally {
+      for(NameNodeConnector nnc : connectors) {
+        IOUtils.cleanupWithLogger(LOG, nnc);
       }
     }
     return ExitStatus.SUCCESS.getExitCode();
   }
 
   static int run(Collection<URI> namenodes, final BalancerParameters p,
-                 Configuration conf) throws IOException, InterruptedException {
-    return run(namenodes, null, p, conf);
-  }
-
-  static int run(Collection<URI> namenodes, Collection<String> nsIds,
-      final BalancerParameters p, Configuration conf)
-      throws IOException, InterruptedException {
+      Configuration conf) throws IOException, InterruptedException {
     if (!p.getRunAsService()) {
-      return doBalance(namenodes, nsIds, p, conf);
+      return doBalance(namenodes, p, conf);
     }
     if (!serviceRunning) {
       serviceRunning = true;
@@ -778,7 +772,7 @@ public class Balancer {
 
     while (serviceRunning) {
       try {
-        int retCode = doBalance(namenodes, nsIds, p, conf);
+        int retCode = doBalance(namenodes, p, conf);
         if (retCode < 0) {
           LOG.info("Balance failed, error code: " + retCode);
           failedTimesSinceLastSuccessfulBalance++;
@@ -862,8 +856,7 @@ public class Balancer {
         checkReplicationPolicyCompatibility(conf);
 
         final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
-        final Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
-        return Balancer.run(namenodes, nsIds, parse(args), conf);
+        return Balancer.run(namenodes, parse(args), conf);
       } catch (IOException e) {
         System.out.println(e + ".  Exiting ...");
         return ExitStatus.IO_EXCEPTION.getExitCode();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 8403f82..2844ad5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -25,7 +25,6 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -33,12 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.RateLimiter;
-import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HAUtil;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -106,32 +100,6 @@ public class NameNodeConnector implements Closeable {
     return connectors;
   }
 
-  public static List<NameNodeConnector> newNameNodeConnectors(
-      Collection<URI> namenodes, Collection<String> nsIds, String name,
-      Path idPath, Configuration conf, int maxIdleIterations)
-      throws IOException {
-    final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(
-        namenodes.size());
-    Map<URI, String> uriToNsId = new HashMap<>();
-    if (nsIds != null) {
-      for (URI uri : namenodes) {
-        for (String nsId : nsIds) {
-          if (uri.getAuthority().equals(nsId)) {
-            uriToNsId.put(uri, nsId);
-          }
-        }
-      }
-    }
-    for (URI uri : namenodes) {
-      String nsId = uriToNsId.get(uri);
-      NameNodeConnector nnc = new NameNodeConnector(name, uri, nsId, idPath,
-          null, conf, maxIdleIterations);
-      nnc.getKeyManager().startBlockKeyUpdater();
-      connectors.add(nnc);
-    }
-    return connectors;
-  }
-
   @VisibleForTesting
   public static void setWrite2IdFile(boolean write2IdFile) {
     NameNodeConnector.write2IdFile = write2IdFile;
@@ -146,13 +114,6 @@ public class NameNodeConnector implements Closeable {
   private final String blockpoolID;
 
   private final BalancerProtocols namenode;
-  /**
-   * If set balancerShouldRequestStandby true, Balancer will getBlocks from
-   * Standby NameNode only and it can reduce the performance impact of Active
-   * NameNode, especially in a busy HA mode cluster.
-   */
-  private boolean balancerShouldRequestStandby;
-  private NamenodeProtocol standbyNameNode;
   private final KeyManager keyManager;
   final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
 
@@ -188,11 +149,6 @@ public class NameNodeConnector implements Closeable {
 
     this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
         BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
-    this.balancerShouldRequestStandby = conf.getBoolean(
-        DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY,
-        DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT);
-    this.standbyNameNode = null;
-
     this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
 
     final NamespaceInfo namespaceinfo = namenode.versionRequest();
@@ -211,31 +167,6 @@ public class NameNodeConnector implements Closeable {
     }
   }
 
-  public NameNodeConnector(String name, URI nameNodeUri, String nsId,
-                           Path idPath, List<Path> targetPaths,
-                           Configuration conf, int maxNotChangedIterations)
-      throws IOException {
-    this(name, nameNodeUri, idPath, targetPaths, conf, maxNotChangedIterations);
-    if (nsId != null && HAUtil.isHAEnabled(conf, nsId)) {
-      List<ClientProtocol> namenodes =
-          HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId);
-      for (ClientProtocol proxy : namenodes) {
-        try {
-          if (proxy.getHAServiceState().equals(
-              HAServiceProtocol.HAServiceState.STANDBY)) {
-            this.standbyNameNode = NameNodeProxies.createNonHAProxy(
-                conf, RPC.getServerAddress(proxy), NamenodeProtocol.class,
-                UserGroupInformation.getCurrentUser(), false).getProxy();
-            break;
-          }
-        } catch (Exception e) {
-          //Ignore the exception while connecting to a namenode.
-          LOG.debug("Error while connecting to namenode", e);
-        }
-      }
-    }
-  }
-
   public DistributedFileSystem getDistributedFileSystem() {
     return fs;
   }
@@ -255,22 +186,6 @@ public class NameNodeConnector implements Closeable {
     if (getBlocksRateLimiter != null) {
       getBlocksRateLimiter.acquire();
     }
-    boolean isRequestStandby = true;
-    try {
-      if (balancerShouldRequestStandby && standbyNameNode != null) {
-        return standbyNameNode.getBlocks(datanode, size, minBlockSize);
-      } else {
-        isRequestStandby = false;
-      }
-    } catch (Exception e) {
-      LOG.warn("Request #getBlocks to Standby NameNode but meet exception, " +
-          "will fallback to normal way", e);
-      isRequestStandby = false;
-    } finally {
-      if (isRequestStandby) {
-        LOG.info("Request #getBlocks to Standby NameNode success.");
-      }
-    }
     return namenode.getBlocks(datanode, size, minBlockSize);
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
index 185df12..c604315 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
@@ -17,11 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.times;
@@ -35,7 +31,6 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -49,9 +44,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
-import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.junit.Test;
-import org.slf4j.LoggerFactory;
 
 /**
  * Test balancer with HA NameNodes
@@ -113,12 +106,6 @@ public class TestBalancerWithHANameNodes {
     TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
         / numOfDatanodes, (short) numOfDatanodes, 0);
 
-    boolean isRequestStandby = conf.getBoolean(
-        DFS_HA_ALLOW_STALE_READ_KEY, DFS_HA_ALLOW_STALE_READ_DEFAULT);
-    if (isRequestStandby) {
-      HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
-          cluster.getNameNode(1));
-    }
     // start up an empty node with the same capacity and on the same rack
     long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
     String newNodeRack = TestBalancer.RACK2; // new node's rack
@@ -128,55 +115,14 @@ public class TestBalancerWithHANameNodes {
     TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
         cluster);
     Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
-    Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
     assertEquals(1, namenodes.size());
-    final int r = Balancer.run(namenodes, nsIds, BalancerParameters.DEFAULT,
-        conf);
+    final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
     assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
     TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
         cluster, BalancerParameters.DEFAULT);
   }
 
   /**
-   * Test Balancer request Standby NameNode when enable this feature.
-   */
-  @Test(timeout = 60000)
-  public void testBalancerRequestSBNWithHA() throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(DFS_HA_ALLOW_STALE_READ_KEY, true);
-    conf.setLong(DFS_HA_TAILEDITS_PERIOD_KEY, 1);
-    //conf.setBoolean(DFS_HA_BALANCER_REQUEST_STANDBY_KEY, true);
-    TestBalancer.initConf(conf);
-    assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length);
-    NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
-    nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
-    Configuration copiedConf = new Configuration(conf);
-    cluster = new MiniDFSCluster.Builder(copiedConf)
-        .nnTopology(MiniDFSNNTopology.simpleHATopology())
-        .numDataNodes(TEST_CAPACITIES.length)
-        .racks(TEST_RACKS)
-        .simulatedCapacities(TEST_CAPACITIES)
-        .build();
-    // Try capture NameNodeConnector log.
-    LogCapturer log =LogCapturer.captureLogs(
-        LoggerFactory.getLogger(NameNodeConnector.class));
-    HATestUtil.setFailoverConfigurations(cluster, conf);
-    try {
-      cluster.waitActive();
-      cluster.transitionToActive(0);
-      Thread.sleep(500);
-      client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
-          ClientProtocol.class).getProxy();
-      doTest(conf);
-      // Check getBlocks request to Standby NameNode.
-      assertTrue(log.getOutput().contains(
-          "Request #getBlocks to Standby NameNode success."));
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  /**
    * Test Balancer with ObserverNodes.
    */
   @Test(timeout = 120000)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org