You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ta...@apache.org on 2021/12/09 15:33:34 UTC

[hadoop] branch branch-3.3 updated: HDFS-16333. fix balancer bug when transfer an EC block (#3777)

This is an automated email from the ASF dual-hosted git repository.

tasanuma pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 0dfb4eb  HDFS-16333. fix balancer bug when transfer an EC block (#3777)
0dfb4eb is described below

commit 0dfb4eb6029d52a0fed10c69a499cbefea0af53e
Author: qinyuren <14...@qq.com>
AuthorDate: Thu Dec 9 23:33:03 2021 +0800

    HDFS-16333. fix balancer bug when transfer an EC block (#3777)
---
 .../hadoop/hdfs/server/balancer/Dispatcher.java    |  48 +++++++-
 .../hadoop/hdfs/server/balancer/TestBalancer.java  | 123 ++++++++++++++++++++-
 2 files changed, 166 insertions(+), 5 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 5411b5c..8be3fb0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -491,7 +491,7 @@ public class Dispatcher {
 
   public static class DBlockStriped extends DBlock {
 
-    final byte[] indices;
+    private byte[] indices;
     final short dataBlockNum;
     final int cellSize;
 
@@ -528,6 +528,29 @@ public class Dispatcher {
       }
       return block.getNumBytes();
     }
+
+    public void setIndices(byte[] indices) {
+      this.indices = indices;
+    }
+
+    /**
+     * Adjust EC block indices,it will remove the element of adjustList from indices.
+     * @param adjustList the list will be removed from indices
+     */
+    public void adjustIndices(List<Integer> adjustList) {
+      if (adjustList.isEmpty()) {
+        return;
+      }
+
+      byte[] newIndices = new byte[indices.length - adjustList.size()];
+      for (int i = 0, j = 0; i < indices.length; ++i) {
+        if (!adjustList.contains(i)) {
+          newIndices[j] = indices[i];
+          ++j;
+        }
+      }
+      this.indices = newIndices;
+    }
   }
 
   /** The class represents a desired move. */
@@ -804,7 +827,7 @@ public class Dispatcher {
      * 
      * @return the total size of the received blocks in the number of bytes.
      */
-    private long getBlockList() throws IOException {
+    private long getBlockList() throws IOException, IllegalArgumentException {
       final long size = Math.min(getBlocksSize, blocksToReceive);
       final BlocksWithLocations newBlksLocs =
           nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize);
@@ -841,7 +864,14 @@ public class Dispatcher {
           synchronized (block) {
             block.clearLocations();
 
+            if (blkLocs instanceof StripedBlockWithLocations) {
+              // EC block may adjust indices before, avoid repeated adjustments
+              ((DBlockStriped) block).setIndices(
+                  ((StripedBlockWithLocations) blkLocs).getIndices());
+            }
+
             // update locations
+            List<Integer> adjustList = new ArrayList<>();
             final String[] datanodeUuids = blkLocs.getDatanodeUuids();
             final StorageType[] storageTypes = blkLocs.getStorageTypes();
             for (int i = 0; i < datanodeUuids.length; i++) {
@@ -849,8 +879,20 @@ public class Dispatcher {
                   datanodeUuids[i], storageTypes[i]);
               if (g != null) { // not unknown
                 block.addLocation(g);
+              } else if (blkLocs instanceof StripedBlockWithLocations) {
+                // some datanode may not in storageGroupMap due to decommission operation
+                // or balancer cli with "-exclude" parameter
+                adjustList.add(i);
               }
             }
+
+            if (!adjustList.isEmpty()) {
+              // block.locations mismatch with block.indices
+              // adjust indices to get correct internalBlock for Datanode in #getInternalBlock
+              ((DBlockStriped) block).adjustIndices(adjustList);
+              Preconditions.checkArgument(((DBlockStriped) block).indices.length
+                  == block.locations.size());
+            }
           }
           if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
             if (LOG.isTraceEnabled()) {
@@ -970,7 +1012,7 @@ public class Dispatcher {
             }
             blocksToReceive -= received;
             continue;
-          } catch (IOException e) {
+          } catch (IOException | IllegalArgumentException e) {
             LOG.warn("Exception while getting reportedBlock list", e);
             return;
           }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 3c624cd..498d785 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -72,6 +72,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
 import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -468,6 +469,19 @@ public class TestBalancer {
   static void waitForBalancer(long totalUsedSpace, long totalCapacity,
       ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
       int expectedExcludedNodes) throws IOException, TimeoutException {
+    waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes, true);
+  }
+
+  /**
+   * Wait until balanced: each datanode gives utilization within.
+   * BALANCE_ALLOWED_VARIANCE of average
+   * @throws IOException
+   * @throws TimeoutException
+   */
+  static void waitForBalancer(long totalUsedSpace, long totalCapacity,
+      ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
+      int expectedExcludedNodes, boolean checkExcludeNodesUtilization)
+      throws IOException, TimeoutException {
     long timeout = TIMEOUT;
     long failtime = (timeout <= 0L) ? Long.MAX_VALUE
         : Time.monotonicNow() + timeout;
@@ -489,7 +503,9 @@ public class TestBalancer {
         double nodeUtilization = ((double)datanode.getDfsUsed())
             / datanode.getCapacity();
         if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) {
-          assertTrue(nodeUtilization == 0);
+          if (checkExcludeNodesUtilization) {
+            assertTrue(nodeUtilization == 0);
+          }
           actualExcludedNodeCount++;
           continue;
         }
@@ -774,6 +790,12 @@ public class TestBalancer {
   private void runBalancer(Configuration conf, long totalUsedSpace,
       long totalCapacity, BalancerParameters p, int excludedNodes)
       throws Exception {
+    runBalancer(conf, totalUsedSpace, totalCapacity, p, excludedNodes, true);
+  }
+
+  private void runBalancer(Configuration conf, long totalUsedSpace,
+      long totalCapacity, BalancerParameters p, int excludedNodes,
+      boolean checkExcludeNodesUtilization) throws Exception {
     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
 
     int retry = 5;
@@ -794,7 +816,7 @@ public class TestBalancer {
       LOG.info("  .");
       try {
         waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p,
-            excludedNodes);
+            excludedNodes, checkExcludeNodesUtilization);
       } catch (TimeoutException e) {
         // See HDFS-11682. NN may not get heartbeat to reflect the newest
         // block changes.
@@ -1628,6 +1650,103 @@ public class TestBalancer {
     }
   }
 
+  @Test
+  public void testBalancerWithExcludeListWithStripedFile() throws Exception {
+    Configuration conf = new Configuration();
+    initConfWithStripe(conf);
+    NameNodeConnector.setWrite2IdFile(true);
+    doTestBalancerWithExcludeListWithStripedFile(conf);
+    NameNodeConnector.setWrite2IdFile(false);
+  }
+
+  private void doTestBalancerWithExcludeListWithStripedFile(Configuration conf) throws Exception {
+    int numOfDatanodes = dataBlocks + parityBlocks + 5;
+    int numOfRacks = dataBlocks;
+    long capacity = 20 * defaultBlockSize;
+    long[] capacities = new long[numOfDatanodes];
+    Arrays.fill(capacities, capacity);
+    String[] racks = new String[numOfDatanodes];
+    for (int i = 0; i < numOfDatanodes; i++) {
+      racks[i] = "/rack" + (i % numOfRacks);
+    }
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numOfDatanodes)
+        .racks(racks)
+        .simulatedCapacities(capacities)
+        .build();
+
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
+          ClientProtocol.class).getProxy();
+      client.enableErasureCodingPolicy(
+          StripedFileTestUtil.getDefaultECPolicy().getName());
+      client.setErasureCodingPolicy("/",
+          StripedFileTestUtil.getDefaultECPolicy().getName());
+
+      long totalCapacity = sum(capacities);
+
+      // fill up the cluster with 30% data. It'll be 45% full plus parity.
+      long fileLen = totalCapacity * 3 / 10;
+      long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks;
+      FileSystem fs = cluster.getFileSystem(0);
+      DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong());
+
+      // verify locations of striped blocks
+      LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
+      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+
+      // get datanode report
+      DatanodeInfo[] datanodeReport = client.getDatanodeReport(DatanodeReportType.ALL);
+      long totalBlocks = 0;
+      for (DatanodeInfo dn : datanodeReport) {
+        totalBlocks += dn.getNumBlocks();
+      }
+
+      // add datanode in new rack
+      String newRack = "/rack" + (++numOfRacks);
+      cluster.startDataNodes(conf, 2, true, null,
+          new String[]{newRack, newRack}, null,
+          new long[]{capacity, capacity});
+      totalCapacity += capacity*2;
+      cluster.triggerHeartbeats();
+
+      // add datanode to exclude list
+      Set<String> excludedList = new HashSet<>();
+      excludedList.add(datanodeReport[0].getXferAddr());
+      BalancerParameters.Builder pBuilder = new BalancerParameters.Builder();
+      pBuilder.setExcludedNodes(excludedList);
+
+      // start balancer and check the failed num of moving task
+      runBalancer(conf, totalUsedSpace, totalCapacity, pBuilder.build(),
+          excludedList.size(), false);
+
+      // check total blocks, max wait time 60s
+      final long blocksBeforeBalancer = totalBlocks;
+      GenericTestUtils.waitFor(() -> {
+        DatanodeInfo[] datanodeInfos = null;
+        try {
+          cluster.triggerHeartbeats();
+          datanodeInfos = client.getDatanodeReport(DatanodeReportType.ALL);
+        } catch (IOException e) {
+          Assert.fail(e.getMessage());
+        }
+        long blocksAfterBalancer = 0;
+        for (DatanodeInfo dn : datanodeInfos) {
+          blocksAfterBalancer += dn.getNumBlocks();
+        }
+        return blocksBeforeBalancer == blocksAfterBalancer;
+      }, 3000, 60000);
+
+      // verify locations of striped blocks
+      locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
+      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   private void testNullStripedBlocks(Configuration conf) throws IOException {
     NameNodeConnector nnc = NameNodeConnector.newNameNodeConnectors(
         DFSUtil.getInternalNsRpcUris(conf),

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org