You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2020/05/18 14:10:55 UTC
[hadoop] branch branch-3.3 updated: HDFS-13183. Standby NameNode
process getBlocks request to reduce Active load. Contributed by Xiaoqiao
He.
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new acae31a HDFS-13183. Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He.
acae31a is described below
commit acae31aa2824bf50fe9291148357b4f5c76a9329
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Mon May 18 07:08:32 2020 -0700
HDFS-13183. Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He.
Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
(cherry picked from commit a3f44dacc1fa19acc4eefd1e2505e54f8629e603)
---
.../hadoop/hdfs/server/balancer/Balancer.java | 39 ++++++----
.../hdfs/server/balancer/NameNodeConnector.java | 85 ++++++++++++++++++++++
.../balancer/TestBalancerWithHANameNodes.java | 56 +++++++++++++-
3 files changed, 163 insertions(+), 17 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index e8b4971..f643590 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -37,6 +37,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.DFSUtilClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -688,7 +689,7 @@ public class Balancer {
* execute a {@link Balancer} to work through all datanodes once.
*/
static private int doBalance(Collection<URI> namenodes,
- final BalancerParameters p, Configuration conf)
+ Collection<String> nsIds, final BalancerParameters p, Configuration conf)
throws IOException, InterruptedException {
final long sleeptime =
conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
@@ -707,13 +708,12 @@ public class Balancer {
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
List<NameNodeConnector> connectors = Collections.emptyList();
- try {
- connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
- Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
- p.getMaxIdleIteration());
-
- boolean done = false;
- for(int iteration = 0; !done; iteration++) {
+ boolean done = false;
+ for(int iteration = 0; !done; iteration++) {
+ try {
+ connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds,
+ Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
+ p.getMaxIdleIteration());
done = true;
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
@@ -741,19 +741,25 @@ public class Balancer {
if (!done) {
Thread.sleep(sleeptime);
}
- }
- } finally {
- for(NameNodeConnector nnc : connectors) {
- IOUtils.cleanupWithLogger(LOG, nnc);
+ } finally {
+ for(NameNodeConnector nnc : connectors) {
+ IOUtils.cleanupWithLogger(LOG, nnc);
+ }
}
}
return ExitStatus.SUCCESS.getExitCode();
}
static int run(Collection<URI> namenodes, final BalancerParameters p,
- Configuration conf) throws IOException, InterruptedException {
+ Configuration conf) throws IOException, InterruptedException {
+ return run(namenodes, null, p, conf);
+ }
+
+ static int run(Collection<URI> namenodes, Collection<String> nsIds,
+ final BalancerParameters p, Configuration conf)
+ throws IOException, InterruptedException {
if (!p.getRunAsService()) {
- return doBalance(namenodes, p, conf);
+ return doBalance(namenodes, nsIds, p, conf);
}
if (!serviceRunning) {
serviceRunning = true;
@@ -772,7 +778,7 @@ public class Balancer {
while (serviceRunning) {
try {
- int retCode = doBalance(namenodes, p, conf);
+ int retCode = doBalance(namenodes, nsIds, p, conf);
if (retCode < 0) {
LOG.info("Balance failed, error code: " + retCode);
failedTimesSinceLastSuccessfulBalance++;
@@ -856,7 +862,8 @@ public class Balancer {
checkReplicationPolicyCompatibility(conf);
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
- return Balancer.run(namenodes, parse(args), conf);
+ final Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
+ return Balancer.run(namenodes, nsIds, parse(args), conf);
} catch (IOException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.IO_EXCEPTION.getExitCode();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 2844ad5..8403f82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -25,6 +25,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,7 +33,12 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
+import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -100,6 +106,32 @@ public class NameNodeConnector implements Closeable {
return connectors;
}
+ public static List<NameNodeConnector> newNameNodeConnectors(
+ Collection<URI> namenodes, Collection<String> nsIds, String name,
+ Path idPath, Configuration conf, int maxIdleIterations)
+ throws IOException {
+ final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(
+ namenodes.size());
+ Map<URI, String> uriToNsId = new HashMap<>();
+ if (nsIds != null) {
+ for (URI uri : namenodes) {
+ for (String nsId : nsIds) {
+ if (uri.getAuthority().equals(nsId)) {
+ uriToNsId.put(uri, nsId);
+ }
+ }
+ }
+ }
+ for (URI uri : namenodes) {
+ String nsId = uriToNsId.get(uri);
+ NameNodeConnector nnc = new NameNodeConnector(name, uri, nsId, idPath,
+ null, conf, maxIdleIterations);
+ nnc.getKeyManager().startBlockKeyUpdater();
+ connectors.add(nnc);
+ }
+ return connectors;
+ }
+
@VisibleForTesting
public static void setWrite2IdFile(boolean write2IdFile) {
NameNodeConnector.write2IdFile = write2IdFile;
@@ -114,6 +146,13 @@ public class NameNodeConnector implements Closeable {
private final String blockpoolID;
private final BalancerProtocols namenode;
+ /**
+ * If set balancerShouldRequestStandby true, Balancer will getBlocks from
+ * Standby NameNode only and it can reduce the performance impact of Active
+ * NameNode, especially in a busy HA mode cluster.
+ */
+ private boolean balancerShouldRequestStandby;
+ private NamenodeProtocol standbyNameNode;
private final KeyManager keyManager;
final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
@@ -149,6 +188,11 @@ public class NameNodeConnector implements Closeable {
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
+ this.balancerShouldRequestStandby = conf.getBoolean(
+ DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY,
+ DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT);
+ this.standbyNameNode = null;
+
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest();
@@ -167,6 +211,31 @@ public class NameNodeConnector implements Closeable {
}
}
+ public NameNodeConnector(String name, URI nameNodeUri, String nsId,
+ Path idPath, List<Path> targetPaths,
+ Configuration conf, int maxNotChangedIterations)
+ throws IOException {
+ this(name, nameNodeUri, idPath, targetPaths, conf, maxNotChangedIterations);
+ if (nsId != null && HAUtil.isHAEnabled(conf, nsId)) {
+ List<ClientProtocol> namenodes =
+ HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId);
+ for (ClientProtocol proxy : namenodes) {
+ try {
+ if (proxy.getHAServiceState().equals(
+ HAServiceProtocol.HAServiceState.STANDBY)) {
+ this.standbyNameNode = NameNodeProxies.createNonHAProxy(
+ conf, RPC.getServerAddress(proxy), NamenodeProtocol.class,
+ UserGroupInformation.getCurrentUser(), false).getProxy();
+ break;
+ }
+ } catch (Exception e) {
+ //Ignore the exception while connecting to a namenode.
+ LOG.debug("Error while connecting to namenode", e);
+ }
+ }
+ }
+ }
+
public DistributedFileSystem getDistributedFileSystem() {
return fs;
}
@@ -186,6 +255,22 @@ public class NameNodeConnector implements Closeable {
if (getBlocksRateLimiter != null) {
getBlocksRateLimiter.acquire();
}
+ boolean isRequestStandby = true;
+ try {
+ if (balancerShouldRequestStandby && standbyNameNode != null) {
+ return standbyNameNode.getBlocks(datanode, size, minBlockSize);
+ } else {
+ isRequestStandby = false;
+ }
+ } catch (Exception e) {
+ LOG.warn("Request #getBlocks to Standby NameNode but meet exception, " +
+ "will fallback to normal way", e);
+ isRequestStandby = false;
+ } finally {
+ if (isRequestStandby) {
+ LOG.info("Request #getBlocks to Standby NameNode success.");
+ }
+ }
return namenode.getBlocks(datanode, size, minBlockSize);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
index c604315..185df12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
@@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.times;
@@ -31,6 +35,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -44,7 +49,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.Test;
+import org.slf4j.LoggerFactory;
/**
* Test balancer with HA NameNodes
@@ -106,6 +113,12 @@ public class TestBalancerWithHANameNodes {
TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
/ numOfDatanodes, (short) numOfDatanodes, 0);
+ boolean isRequestStandby = conf.getBoolean(
+ DFS_HA_ALLOW_STALE_READ_KEY, DFS_HA_ALLOW_STALE_READ_DEFAULT);
+ if (isRequestStandby) {
+ HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
+ cluster.getNameNode(1));
+ }
// start up an empty node with the same capacity and on the same rack
long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
String newNodeRack = TestBalancer.RACK2; // new node's rack
@@ -115,14 +128,55 @@ public class TestBalancerWithHANameNodes {
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
cluster);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+ Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
assertEquals(1, namenodes.size());
- final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
+ final int r = Balancer.run(namenodes, nsIds, BalancerParameters.DEFAULT,
+ conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, BalancerParameters.DEFAULT);
}
/**
+ * Test Balancer request Standby NameNode when enable this feature.
+ */
+ @Test(timeout = 60000)
+ public void testBalancerRequestSBNWithHA() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFS_HA_ALLOW_STALE_READ_KEY, true);
+ conf.setLong(DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+ //conf.setBoolean(DFS_HA_BALANCER_REQUEST_STANDBY_KEY, true);
+ TestBalancer.initConf(conf);
+ assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length);
+ NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
+ nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
+ Configuration copiedConf = new Configuration(conf);
+ cluster = new MiniDFSCluster.Builder(copiedConf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .numDataNodes(TEST_CAPACITIES.length)
+ .racks(TEST_RACKS)
+ .simulatedCapacities(TEST_CAPACITIES)
+ .build();
+ // Try capture NameNodeConnector log.
+ LogCapturer log =LogCapturer.captureLogs(
+ LoggerFactory.getLogger(NameNodeConnector.class));
+ HATestUtil.setFailoverConfigurations(cluster, conf);
+ try {
+ cluster.waitActive();
+ cluster.transitionToActive(0);
+ Thread.sleep(500);
+ client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
+ ClientProtocol.class).getProxy();
+ doTest(conf);
+ // Check getBlocks request to Standby NameNode.
+ assertTrue(log.getOutput().contains(
+ "Request #getBlocks to Standby NameNode success."));
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
* Test Balancer with ObserverNodes.
*/
@Test(timeout = 120000)
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org