You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ta...@apache.org on 2021/12/09 15:42:43 UTC
[hadoop] branch branch-3.2 updated: HDFS-16333. fix balancer bug when transfer an EC block (#3777)
This is an automated email from the ASF dual-hosted git repository.
tasanuma pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 72ffbd9 HDFS-16333. fix balancer bug when transfer an EC block (#3777)
72ffbd9 is described below
commit 72ffbd956a6d5090a7b28c63183b032ab5467e68
Author: qinyuren <14...@qq.com>
AuthorDate: Thu Dec 9 23:33:03 2021 +0800
HDFS-16333. fix balancer bug when transfer an EC block (#3777)
---
.../hadoop/hdfs/server/balancer/Dispatcher.java | 48 +++++++-
.../hadoop/hdfs/server/balancer/TestBalancer.java | 123 ++++++++++++++++++++-
2 files changed, 166 insertions(+), 5 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 1694a12..0581793 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -490,7 +490,7 @@ public class Dispatcher {
public static class DBlockStriped extends DBlock {
- final byte[] indices;
+ private byte[] indices;
final short dataBlockNum;
final int cellSize;
@@ -527,6 +527,29 @@ public class Dispatcher {
}
return block.getNumBytes();
}
+
+ public void setIndices(byte[] indices) {
+ this.indices = indices;
+ }
+
+ /**
+ * Adjust EC block indices,it will remove the element of adjustList from indices.
+ * @param adjustList the list will be removed from indices
+ */
+ public void adjustIndices(List<Integer> adjustList) {
+ if (adjustList.isEmpty()) {
+ return;
+ }
+
+ byte[] newIndices = new byte[indices.length - adjustList.size()];
+ for (int i = 0, j = 0; i < indices.length; ++i) {
+ if (!adjustList.contains(i)) {
+ newIndices[j] = indices[i];
+ ++j;
+ }
+ }
+ this.indices = newIndices;
+ }
}
/** The class represents a desired move. */
@@ -803,7 +826,7 @@ public class Dispatcher {
*
* @return the total size of the received blocks in the number of bytes.
*/
- private long getBlockList() throws IOException {
+ private long getBlockList() throws IOException, IllegalArgumentException {
final long size = Math.min(getBlocksSize, blocksToReceive);
final BlocksWithLocations newBlksLocs =
nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize);
@@ -840,7 +863,14 @@ public class Dispatcher {
synchronized (block) {
block.clearLocations();
+ if (blkLocs instanceof StripedBlockWithLocations) {
+ // EC block may adjust indices before, avoid repeated adjustments
+ ((DBlockStriped) block).setIndices(
+ ((StripedBlockWithLocations) blkLocs).getIndices());
+ }
+
// update locations
+ List<Integer> adjustList = new ArrayList<>();
final String[] datanodeUuids = blkLocs.getDatanodeUuids();
final StorageType[] storageTypes = blkLocs.getStorageTypes();
for (int i = 0; i < datanodeUuids.length; i++) {
@@ -848,8 +878,20 @@ public class Dispatcher {
datanodeUuids[i], storageTypes[i]);
if (g != null) { // not unknown
block.addLocation(g);
+ } else if (blkLocs instanceof StripedBlockWithLocations) {
+ // some datanode may not in storageGroupMap due to decommission operation
+ // or balancer cli with "-exclude" parameter
+ adjustList.add(i);
}
}
+
+ if (!adjustList.isEmpty()) {
+ // block.locations mismatch with block.indices
+ // adjust indices to get correct internalBlock for Datanode in #getInternalBlock
+ ((DBlockStriped) block).adjustIndices(adjustList);
+ Preconditions.checkArgument(((DBlockStriped) block).indices.length
+ == block.locations.size());
+ }
}
if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
if (LOG.isTraceEnabled()) {
@@ -969,7 +1011,7 @@ public class Dispatcher {
}
blocksToReceive -= received;
continue;
- } catch (IOException e) {
+ } catch (IOException | IllegalArgumentException e) {
LOG.warn("Exception while getting reportedBlock list", e);
return;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 3c624cd..498d785 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -72,6 +72,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -468,6 +469,19 @@ public class TestBalancer {
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
int expectedExcludedNodes) throws IOException, TimeoutException {
+ waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes, true);
+ }
+
+ /**
+ * Wait until balanced: each datanode gives utilization within.
+ * BALANCE_ALLOWED_VARIANCE of average
+ * @throws IOException
+ * @throws TimeoutException
+ */
+ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
+ ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
+ int expectedExcludedNodes, boolean checkExcludeNodesUtilization)
+ throws IOException, TimeoutException {
long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
: Time.monotonicNow() + timeout;
@@ -489,7 +503,9 @@ public class TestBalancer {
double nodeUtilization = ((double)datanode.getDfsUsed())
/ datanode.getCapacity();
if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) {
- assertTrue(nodeUtilization == 0);
+ if (checkExcludeNodesUtilization) {
+ assertTrue(nodeUtilization == 0);
+ }
actualExcludedNodeCount++;
continue;
}
@@ -774,6 +790,12 @@ public class TestBalancer {
private void runBalancer(Configuration conf, long totalUsedSpace,
long totalCapacity, BalancerParameters p, int excludedNodes)
throws Exception {
+ runBalancer(conf, totalUsedSpace, totalCapacity, p, excludedNodes, true);
+ }
+
+ private void runBalancer(Configuration conf, long totalUsedSpace,
+ long totalCapacity, BalancerParameters p, int excludedNodes,
+ boolean checkExcludeNodesUtilization) throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
int retry = 5;
@@ -794,7 +816,7 @@ public class TestBalancer {
LOG.info(" .");
try {
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p,
- excludedNodes);
+ excludedNodes, checkExcludeNodesUtilization);
} catch (TimeoutException e) {
// See HDFS-11682. NN may not get heartbeat to reflect the newest
// block changes.
@@ -1628,6 +1650,103 @@ public class TestBalancer {
}
}
+ @Test
+ public void testBalancerWithExcludeListWithStripedFile() throws Exception {
+ Configuration conf = new Configuration();
+ initConfWithStripe(conf);
+ NameNodeConnector.setWrite2IdFile(true);
+ doTestBalancerWithExcludeListWithStripedFile(conf);
+ NameNodeConnector.setWrite2IdFile(false);
+ }
+
+ private void doTestBalancerWithExcludeListWithStripedFile(Configuration conf) throws Exception {
+ int numOfDatanodes = dataBlocks + parityBlocks + 5;
+ int numOfRacks = dataBlocks;
+ long capacity = 20 * defaultBlockSize;
+ long[] capacities = new long[numOfDatanodes];
+ Arrays.fill(capacities, capacity);
+ String[] racks = new String[numOfDatanodes];
+ for (int i = 0; i < numOfDatanodes; i++) {
+ racks[i] = "/rack" + (i % numOfRacks);
+ }
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numOfDatanodes)
+ .racks(racks)
+ .simulatedCapacities(capacities)
+ .build();
+
+ try {
+ cluster.waitActive();
+ client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
+ ClientProtocol.class).getProxy();
+ client.enableErasureCodingPolicy(
+ StripedFileTestUtil.getDefaultECPolicy().getName());
+ client.setErasureCodingPolicy("/",
+ StripedFileTestUtil.getDefaultECPolicy().getName());
+
+ long totalCapacity = sum(capacities);
+
+ // fill up the cluster with 30% data. It'll be 45% full plus parity.
+ long fileLen = totalCapacity * 3 / 10;
+ long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks;
+ FileSystem fs = cluster.getFileSystem(0);
+ DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong());
+
+ // verify locations of striped blocks
+ LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+
+ // get datanode report
+ DatanodeInfo[] datanodeReport = client.getDatanodeReport(DatanodeReportType.ALL);
+ long totalBlocks = 0;
+ for (DatanodeInfo dn : datanodeReport) {
+ totalBlocks += dn.getNumBlocks();
+ }
+
+ // add datanode in new rack
+ String newRack = "/rack" + (++numOfRacks);
+ cluster.startDataNodes(conf, 2, true, null,
+ new String[]{newRack, newRack}, null,
+ new long[]{capacity, capacity});
+ totalCapacity += capacity*2;
+ cluster.triggerHeartbeats();
+
+ // add datanode to exclude list
+ Set<String> excludedList = new HashSet<>();
+ excludedList.add(datanodeReport[0].getXferAddr());
+ BalancerParameters.Builder pBuilder = new BalancerParameters.Builder();
+ pBuilder.setExcludedNodes(excludedList);
+
+ // start balancer and check the failed num of moving task
+ runBalancer(conf, totalUsedSpace, totalCapacity, pBuilder.build(),
+ excludedList.size(), false);
+
+ // check total blocks, max wait time 60s
+ final long blocksBeforeBalancer = totalBlocks;
+ GenericTestUtils.waitFor(() -> {
+ DatanodeInfo[] datanodeInfos = null;
+ try {
+ cluster.triggerHeartbeats();
+ datanodeInfos = client.getDatanodeReport(DatanodeReportType.ALL);
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ }
+ long blocksAfterBalancer = 0;
+ for (DatanodeInfo dn : datanodeInfos) {
+ blocksAfterBalancer += dn.getNumBlocks();
+ }
+ return blocksBeforeBalancer == blocksAfterBalancer;
+ }, 3000, 60000);
+
+ // verify locations of striped blocks
+ locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
private void testNullStripedBlocks(Configuration conf) throws IOException {
NameNodeConnector nnc = NameNodeConnector.newNameNodeConnectors(
DFSUtil.getInternalNsRpcUris(conf),
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org