You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2020/05/18 16:40:16 UTC
[hadoop] branch branch-3.3 updated: Revert "HDFS-13183. Standby
NameNode process getBlocks request to reduce Active load. Contributed by
Xiaoqiao He."
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 53d22fd Revert "HDFS-13183. Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He."
53d22fd is described below
commit 53d22fdb88c60f43d8674348529d18917fcf6e39
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Mon May 18 09:39:57 2020 -0700
Revert "HDFS-13183. Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He."
This reverts commit acae31aa2824bf50fe9291148357b4f5c76a9329.
---
.../hadoop/hdfs/server/balancer/Balancer.java | 39 ++++------
.../hdfs/server/balancer/NameNodeConnector.java | 85 ----------------------
.../balancer/TestBalancerWithHANameNodes.java | 56 +-------------
3 files changed, 17 insertions(+), 163 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index f643590..e8b4971 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -37,7 +37,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdfs.DFSUtilClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -689,7 +688,7 @@ public class Balancer {
* execute a {@link Balancer} to work through all datanodes once.
*/
static private int doBalance(Collection<URI> namenodes,
- Collection<String> nsIds, final BalancerParameters p, Configuration conf)
+ final BalancerParameters p, Configuration conf)
throws IOException, InterruptedException {
final long sleeptime =
conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
@@ -708,12 +707,13 @@ public class Balancer {
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
List<NameNodeConnector> connectors = Collections.emptyList();
- boolean done = false;
- for(int iteration = 0; !done; iteration++) {
- try {
- connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds,
- Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
- p.getMaxIdleIteration());
+ try {
+ connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
+ Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
+ p.getMaxIdleIteration());
+
+ boolean done = false;
+ for(int iteration = 0; !done; iteration++) {
done = true;
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
@@ -741,25 +741,19 @@ public class Balancer {
if (!done) {
Thread.sleep(sleeptime);
}
- } finally {
- for(NameNodeConnector nnc : connectors) {
- IOUtils.cleanupWithLogger(LOG, nnc);
- }
+ }
+ } finally {
+ for(NameNodeConnector nnc : connectors) {
+ IOUtils.cleanupWithLogger(LOG, nnc);
}
}
return ExitStatus.SUCCESS.getExitCode();
}
static int run(Collection<URI> namenodes, final BalancerParameters p,
- Configuration conf) throws IOException, InterruptedException {
- return run(namenodes, null, p, conf);
- }
-
- static int run(Collection<URI> namenodes, Collection<String> nsIds,
- final BalancerParameters p, Configuration conf)
- throws IOException, InterruptedException {
+ Configuration conf) throws IOException, InterruptedException {
if (!p.getRunAsService()) {
- return doBalance(namenodes, nsIds, p, conf);
+ return doBalance(namenodes, p, conf);
}
if (!serviceRunning) {
serviceRunning = true;
@@ -778,7 +772,7 @@ public class Balancer {
while (serviceRunning) {
try {
- int retCode = doBalance(namenodes, nsIds, p, conf);
+ int retCode = doBalance(namenodes, p, conf);
if (retCode < 0) {
LOG.info("Balance failed, error code: " + retCode);
failedTimesSinceLastSuccessfulBalance++;
@@ -862,8 +856,7 @@ public class Balancer {
checkReplicationPolicyCompatibility(conf);
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
- final Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
- return Balancer.run(namenodes, nsIds, parse(args), conf);
+ return Balancer.run(namenodes, parse(args), conf);
} catch (IOException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.IO_EXCEPTION.getExitCode();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 8403f82..2844ad5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -25,7 +25,6 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -33,12 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
-import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HAUtil;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -106,32 +100,6 @@ public class NameNodeConnector implements Closeable {
return connectors;
}
- public static List<NameNodeConnector> newNameNodeConnectors(
- Collection<URI> namenodes, Collection<String> nsIds, String name,
- Path idPath, Configuration conf, int maxIdleIterations)
- throws IOException {
- final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(
- namenodes.size());
- Map<URI, String> uriToNsId = new HashMap<>();
- if (nsIds != null) {
- for (URI uri : namenodes) {
- for (String nsId : nsIds) {
- if (uri.getAuthority().equals(nsId)) {
- uriToNsId.put(uri, nsId);
- }
- }
- }
- }
- for (URI uri : namenodes) {
- String nsId = uriToNsId.get(uri);
- NameNodeConnector nnc = new NameNodeConnector(name, uri, nsId, idPath,
- null, conf, maxIdleIterations);
- nnc.getKeyManager().startBlockKeyUpdater();
- connectors.add(nnc);
- }
- return connectors;
- }
-
@VisibleForTesting
public static void setWrite2IdFile(boolean write2IdFile) {
NameNodeConnector.write2IdFile = write2IdFile;
@@ -146,13 +114,6 @@ public class NameNodeConnector implements Closeable {
private final String blockpoolID;
private final BalancerProtocols namenode;
- /**
- * If set balancerShouldRequestStandby true, Balancer will getBlocks from
- * Standby NameNode only and it can reduce the performance impact of Active
- * NameNode, especially in a busy HA mode cluster.
- */
- private boolean balancerShouldRequestStandby;
- private NamenodeProtocol standbyNameNode;
private final KeyManager keyManager;
final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
@@ -188,11 +149,6 @@ public class NameNodeConnector implements Closeable {
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
- this.balancerShouldRequestStandby = conf.getBoolean(
- DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY,
- DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT);
- this.standbyNameNode = null;
-
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest();
@@ -211,31 +167,6 @@ public class NameNodeConnector implements Closeable {
}
}
- public NameNodeConnector(String name, URI nameNodeUri, String nsId,
- Path idPath, List<Path> targetPaths,
- Configuration conf, int maxNotChangedIterations)
- throws IOException {
- this(name, nameNodeUri, idPath, targetPaths, conf, maxNotChangedIterations);
- if (nsId != null && HAUtil.isHAEnabled(conf, nsId)) {
- List<ClientProtocol> namenodes =
- HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId);
- for (ClientProtocol proxy : namenodes) {
- try {
- if (proxy.getHAServiceState().equals(
- HAServiceProtocol.HAServiceState.STANDBY)) {
- this.standbyNameNode = NameNodeProxies.createNonHAProxy(
- conf, RPC.getServerAddress(proxy), NamenodeProtocol.class,
- UserGroupInformation.getCurrentUser(), false).getProxy();
- break;
- }
- } catch (Exception e) {
- //Ignore the exception while connecting to a namenode.
- LOG.debug("Error while connecting to namenode", e);
- }
- }
- }
- }
-
public DistributedFileSystem getDistributedFileSystem() {
return fs;
}
@@ -255,22 +186,6 @@ public class NameNodeConnector implements Closeable {
if (getBlocksRateLimiter != null) {
getBlocksRateLimiter.acquire();
}
- boolean isRequestStandby = true;
- try {
- if (balancerShouldRequestStandby && standbyNameNode != null) {
- return standbyNameNode.getBlocks(datanode, size, minBlockSize);
- } else {
- isRequestStandby = false;
- }
- } catch (Exception e) {
- LOG.warn("Request #getBlocks to Standby NameNode but meet exception, " +
- "will fallback to normal way", e);
- isRequestStandby = false;
- } finally {
- if (isRequestStandby) {
- LOG.info("Request #getBlocks to Standby NameNode success.");
- }
- }
return namenode.getBlocks(datanode, size, minBlockSize);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
index 185df12..c604315 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
@@ -17,11 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.times;
@@ -35,7 +31,6 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -49,9 +44,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
-import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.Test;
-import org.slf4j.LoggerFactory;
/**
* Test balancer with HA NameNodes
@@ -113,12 +106,6 @@ public class TestBalancerWithHANameNodes {
TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
/ numOfDatanodes, (short) numOfDatanodes, 0);
- boolean isRequestStandby = conf.getBoolean(
- DFS_HA_ALLOW_STALE_READ_KEY, DFS_HA_ALLOW_STALE_READ_DEFAULT);
- if (isRequestStandby) {
- HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
- cluster.getNameNode(1));
- }
// start up an empty node with the same capacity and on the same rack
long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
String newNodeRack = TestBalancer.RACK2; // new node's rack
@@ -128,55 +115,14 @@ public class TestBalancerWithHANameNodes {
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
cluster);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
- Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
assertEquals(1, namenodes.size());
- final int r = Balancer.run(namenodes, nsIds, BalancerParameters.DEFAULT,
- conf);
+ final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, BalancerParameters.DEFAULT);
}
/**
- * Test Balancer request Standby NameNode when enable this feature.
- */
- @Test(timeout = 60000)
- public void testBalancerRequestSBNWithHA() throws Exception {
- Configuration conf = new HdfsConfiguration();
- conf.setBoolean(DFS_HA_ALLOW_STALE_READ_KEY, true);
- conf.setLong(DFS_HA_TAILEDITS_PERIOD_KEY, 1);
- //conf.setBoolean(DFS_HA_BALANCER_REQUEST_STANDBY_KEY, true);
- TestBalancer.initConf(conf);
- assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length);
- NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
- nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
- Configuration copiedConf = new Configuration(conf);
- cluster = new MiniDFSCluster.Builder(copiedConf)
- .nnTopology(MiniDFSNNTopology.simpleHATopology())
- .numDataNodes(TEST_CAPACITIES.length)
- .racks(TEST_RACKS)
- .simulatedCapacities(TEST_CAPACITIES)
- .build();
- // Try capture NameNodeConnector log.
- LogCapturer log =LogCapturer.captureLogs(
- LoggerFactory.getLogger(NameNodeConnector.class));
- HATestUtil.setFailoverConfigurations(cluster, conf);
- try {
- cluster.waitActive();
- cluster.transitionToActive(0);
- Thread.sleep(500);
- client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
- ClientProtocol.class).getProxy();
- doTest(conf);
- // Check getBlocks request to Standby NameNode.
- assertTrue(log.getOutput().contains(
- "Request #getBlocks to Standby NameNode success."));
- } finally {
- cluster.shutdown();
- }
- }
-
- /**
* Test Balancer with ObserverNodes.
*/
@Test(timeout = 120000)
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org