You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2015/04/17 03:26:28 UTC

hadoop git commit: HDFS-7891. A block placement policy with best rack failure tolerance. Contributed by Walter Su

Repository: hadoop
Updated Branches:
  refs/heads/trunk 4308910ee -> 9595cc003


HDFS-7891. A block placement policy with best rack failure tolerance.  Contributed by Walter Su


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9595cc00
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9595cc00
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9595cc00

Branch: refs/heads/trunk
Commit: 9595cc003ca5ed3d59b6942056a4fcb9080f79c9
Parents: 4308910
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Thu Apr 16 18:25:53 2015 -0700
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Thu Apr 16 18:25:53 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../BlockPlacementPolicyDefault.java            |   2 +-
 .../BlockPlacementPolicyRackFaultTolarent.java  | 154 ++++++++++++++
 ...stBlockPlacementPolicyRackFaultTolarent.java | 209 +++++++++++++++++++
 4 files changed, 367 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9595cc00/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index dd0979b..e977e6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -323,6 +323,9 @@ Release 2.8.0 - UNRELEASED
 
   NEW FEATURES
 
+    HDFS-7891. A block placement policy with best rack failure tolerance.
+    (Walter Su via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-3918. EditLogTailer shouldn't log WARN when other node

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9595cc00/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index 09db986..c2752ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -237,7 +237,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
    *         is independent of the number of chosen nodes, as it is calculated
    *         using the target number of replicas.
    */
-  private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
+  protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
     int clusterSize = clusterMap.getNumOfLeaves();
     int totalNumOfReplicas = numOfChosen + numOfReplicas;
     if (totalNumOfReplicas > clusterSize) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9595cc00/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java
new file mode 100644
index 0000000..4dbf384
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+
+import java.util.*;
+
+/**
+ * The class is responsible for choosing the desired number of targets
+ * for placing block replicas.
+ * The strategy is that it tries its best to place the replicas to most racks.
+ */
+@InterfaceAudience.Private
+public class BlockPlacementPolicyRackFaultTolarent extends BlockPlacementPolicyDefault {
+
+  @Override
+  protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
+    int clusterSize = clusterMap.getNumOfLeaves();
+    int totalNumOfReplicas = numOfChosen + numOfReplicas;
+    if (totalNumOfReplicas > clusterSize) {
+      numOfReplicas -= (totalNumOfReplicas-clusterSize);
+      totalNumOfReplicas = clusterSize;
+    }
+    // No calculation needed when there is only one rack or picking one node.
+    int numOfRacks = clusterMap.getNumOfRacks();
+    if (numOfRacks == 1 || totalNumOfReplicas <= 1) {
+      return new int[] {numOfReplicas, totalNumOfReplicas};
+    }
+    if(totalNumOfReplicas<numOfRacks){
+      return new int[] {numOfReplicas, 1};
+    }
+    int maxNodesPerRack = (totalNumOfReplicas - 1) / numOfRacks + 1;
+    return new int[] {numOfReplicas, maxNodesPerRack};
+  }
+
+  /**
+   * Choose numOfReplicas in order:
+   * 1. If total replica expected is less than numOfRacks in cluster, it choose
+   * randomly.
+   * 2. If total replica expected is bigger than numOfRacks, it choose:
+   *  2a. Fill each rack exactly (maxNodesPerRack-1) replicas.
+   *  2b. For some random racks, place one more replica to each one of them, until
+   *  numOfReplicas have been chosen. <br>
+   * In the end, the difference of the numbers of replicas for each two racks
+   * is no more than 1.
+   * Either way it always prefer local storage.
+   * @return local node of writer
+   */
+  @Override
+  protected Node chooseTargetInOrder(int numOfReplicas,
+                                 Node writer,
+                                 final Set<Node> excludedNodes,
+                                 final long blocksize,
+                                 final int maxNodesPerRack,
+                                 final List<DatanodeStorageInfo> results,
+                                 final boolean avoidStaleNodes,
+                                 final boolean newBlock,
+                                 EnumMap<StorageType, Integer> storageTypes)
+                                 throws NotEnoughReplicasException {
+    int totalReplicaExpected = results.size() + numOfReplicas;
+    int numOfRacks = clusterMap.getNumOfRacks();
+    if (totalReplicaExpected < numOfRacks ||
+        totalReplicaExpected % numOfRacks == 0) {
+      writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+      return writer;
+    }
+
+    assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks;
+
+    // Calculate numOfReplicas for filling each rack exactly (maxNodesPerRack-1)
+    // replicas.
+    HashMap<String, Integer> rackCounts = new HashMap<>();
+    for (DatanodeStorageInfo dsInfo : results) {
+      String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation();
+      Integer count = rackCounts.get(rack);
+      if (count != null) {
+        rackCounts.put(rack, count + 1);
+      } else {
+        rackCounts.put(rack, 1);
+      }
+    }
+    int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in results
+    for (int count : rackCounts.values()) {
+      if (count > maxNodesPerRack -1) {
+        excess += count - (maxNodesPerRack -1);
+      }
+    }
+    numOfReplicas = Math.min(totalReplicaExpected - results.size(),
+        (maxNodesPerRack -1) * numOfRacks - (results.size() - excess));
+
+    // Fill each rack exactly (maxNodesPerRack-1) replicas.
+    writer = chooseOnce(numOfReplicas, writer, new HashSet<>(excludedNodes),
+        blocksize, maxNodesPerRack -1, results, avoidStaleNodes, storageTypes);
+
+    for (DatanodeStorageInfo resultStorage : results) {
+      addToExcludedNodes(resultStorage.getDatanodeDescriptor(), excludedNodes);
+    }
+
+    // For some racks, place one more replica to each one of them.
+    numOfReplicas = totalReplicaExpected - results.size();
+    chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
+        maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+
+    return writer;
+  }
+
+  /**
+   * Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
+   * Except that 1st replica prefer local storage.
+   * @return local node of writer.
+   */
+  private Node chooseOnce(int numOfReplicas,
+                            Node writer,
+                            final Set<Node> excludedNodes,
+                            final long blocksize,
+                            final int maxNodesPerRack,
+                            final List<DatanodeStorageInfo> results,
+                            final boolean avoidStaleNodes,
+                            EnumMap<StorageType, Integer> storageTypes)
+                            throws NotEnoughReplicasException {
+    if (numOfReplicas == 0) {
+      return writer;
+    }
+    writer = chooseLocalStorage(writer, excludedNodes, blocksize,
+        maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
+        .getDatanodeDescriptor();
+    if (--numOfReplicas == 0) {
+      return writer;
+    }
+    chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
+        maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+    return writer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9595cc00/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java
new file mode 100644
index 0000000..d86a267
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolarent;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.net.StaticMapping;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockPlacementPolicyRackFaultTolarent {
+
+  private static final int DEFAULT_BLOCK_SIZE = 1024;
+  private MiniDFSCluster cluster = null;
+  private NamenodeProtocols nameNodeRpc = null;
+  private FSNamesystem namesystem = null;
+  private PermissionStatus perm = null;
+
+  @Before
+  public void setup() throws IOException {
+    StaticMapping.resetMap();
+    Configuration conf = new HdfsConfiguration();
+    final ArrayList<String> rackList = new ArrayList<String>();
+    final ArrayList<String> hostList = new ArrayList<String>();
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 2; j++) {
+        rackList.add("/rack" + i);
+        hostList.add("/host" + i + j);
+      }
+    }
+    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyRackFaultTolarent.class,
+        BlockPlacementPolicy.class);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(hostList.size())
+        .racks(rackList.toArray(new String[rackList.size()]))
+        .hosts(hostList.toArray(new String[hostList.size()]))
+        .build();
+    cluster.waitActive();
+    nameNodeRpc = cluster.getNameNodeRpc();
+    namesystem = cluster.getNamesystem();
+    perm = new PermissionStatus("TestBlockPlacementPolicyEC", null,
+        FsPermission.getDefault());
+  }
+
+  @After
+  public void teardown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testChooseTarget() throws Exception {
+    doTestChooseTargetNormalCase();
+    doTestChooseTargetSpecialCase();
+  }
+
+  private void doTestChooseTargetNormalCase() throws Exception {
+    String clientMachine = "client.foo.com";
+    short[][] testSuite = {
+        {3, 2}, {3, 7}, {3, 8}, {3, 10}, {9, 1}, {10, 1}, {10, 6}, {11, 6},
+        {11, 9}
+    };
+    // Test 5 files
+    int fileCount = 0;
+    for (int i = 0; i < 5; i++) {
+      for (short[] testCase : testSuite) {
+        short replication = testCase[0];
+        short additionalReplication = testCase[1];
+        String src = "/testfile" + (fileCount++);
+        // Create the file with client machine
+        HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
+            clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
+            replication, DEFAULT_BLOCK_SIZE, null, false);
+
+        //test chooseTarget for new file
+        LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
+            null, null, fileStatus.getFileId(), null);
+        doTestLocatedBlock(replication, locatedBlock);
+
+        //test chooseTarget for existing file.
+        LocatedBlock additionalLocatedBlock =
+            nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(),
+                locatedBlock.getBlock(), locatedBlock.getLocations(),
+                locatedBlock.getStorageIDs(), new DatanodeInfo[0],
+                additionalReplication, clientMachine);
+        doTestLocatedBlock(replication + additionalReplication, additionalLocatedBlock);
+      }
+    }
+  }
+
+  /**
+   * Test more randomly. So it covers some special cases.
+   * Like when some racks already have 2 replicas, while some racks have none,
+   * we should choose the racks that have none.
+   */
+  private void doTestChooseTargetSpecialCase() throws Exception {
+    String clientMachine = "client.foo.com";
+    // Test 5 files
+    String src = "/testfile_1_";
+    // Create the file with client machine
+    HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
+        clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
+        (short) 20, DEFAULT_BLOCK_SIZE, null, false);
+
+    //test chooseTarget for new file
+    LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
+        null, null, fileStatus.getFileId(), null);
+    doTestLocatedBlock(20, locatedBlock);
+
+    DatanodeInfo[] locs = locatedBlock.getLocations();
+    String[] storageIDs = locatedBlock.getStorageIDs();
+
+    for (int time = 0; time < 5; time++) {
+      shuffle(locs, storageIDs);
+      for (int i = 1; i < locs.length; i++) {
+        DatanodeInfo[] partLocs = new DatanodeInfo[i];
+        String[] partStorageIDs = new String[i];
+        System.arraycopy(locs, 0, partLocs, 0, i);
+        System.arraycopy(storageIDs, 0, partStorageIDs, 0, i);
+        for (int j = 1; j < 20 - i; j++) {
+          LocatedBlock additionalLocatedBlock =
+              nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(),
+                  locatedBlock.getBlock(), partLocs,
+                  partStorageIDs, new DatanodeInfo[0],
+                  j, clientMachine);
+          doTestLocatedBlock(i + j, additionalLocatedBlock);
+        }
+      }
+    }
+  }
+
+  private void shuffle(DatanodeInfo[] locs, String[] storageIDs) {
+    int length = locs.length;
+    Object[][] pairs = new Object[length][];
+    for (int i = 0; i < length; i++) {
+      pairs[i] = new Object[]{locs[i], storageIDs[i]};
+    }
+    DFSUtil.shuffle(pairs);
+    for (int i = 0; i < length; i++) {
+      locs[i] = (DatanodeInfo) pairs[i][0];
+      storageIDs[i] = (String) pairs[i][1];
+    }
+  }
+
+  private void doTestLocatedBlock(int replication, LocatedBlock locatedBlock) {
+    assertEquals(replication, locatedBlock.getLocations().length);
+
+    HashMap<String, Integer> racksCount = new HashMap<String, Integer>();
+    for (DatanodeInfo node :
+        locatedBlock.getLocations()) {
+      addToRacksCount(node.getNetworkLocation(), racksCount);
+    }
+
+    int minCount = Integer.MAX_VALUE;
+    int maxCount = Integer.MIN_VALUE;
+    for (Integer rackCount : racksCount.values()) {
+      minCount = Math.min(minCount, rackCount);
+      maxCount = Math.max(maxCount, rackCount);
+    }
+    assertTrue(maxCount - minCount <= 1);
+  }
+
+  private void addToRacksCount(String rack, HashMap<String, Integer> racksCount) {
+    Integer count = racksCount.get(rack);
+    if (count == null) {
+      racksCount.put(rack, 1);
+    } else {
+      racksCount.put(rack, count + 1);
+    }
+  }
+}