You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2015/04/17 03:26:28 UTC
hadoop git commit: HDFS-7891. A block placement policy with best rack
failure tolerance. Contributed by Walter Su
Repository: hadoop
Updated Branches:
refs/heads/trunk 4308910ee -> 9595cc003
HDFS-7891. A block placement policy with best rack failure tolerance. Contributed by Walter Su
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9595cc00
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9595cc00
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9595cc00
Branch: refs/heads/trunk
Commit: 9595cc003ca5ed3d59b6942056a4fcb9080f79c9
Parents: 4308910
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Thu Apr 16 18:25:53 2015 -0700
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Thu Apr 16 18:25:53 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../BlockPlacementPolicyDefault.java | 2 +-
.../BlockPlacementPolicyRackFaultTolarent.java | 154 ++++++++++++++
...stBlockPlacementPolicyRackFaultTolarent.java | 209 +++++++++++++++++++
4 files changed, 367 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9595cc00/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index dd0979b..e977e6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -323,6 +323,9 @@ Release 2.8.0 - UNRELEASED
NEW FEATURES
+ HDFS-7891. A block placement policy with best rack failure tolerance.
+ (Walter Su via szetszwo)
+
IMPROVEMENTS
HDFS-3918. EditLogTailer shouldn't log WARN when other node
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9595cc00/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index 09db986..c2752ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -237,7 +237,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* is independent of the number of chosen nodes, as it is calculated
* using the target number of replicas.
*/
- private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
+ protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
int clusterSize = clusterMap.getNumOfLeaves();
int totalNumOfReplicas = numOfChosen + numOfReplicas;
if (totalNumOfReplicas > clusterSize) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9595cc00/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java
new file mode 100644
index 0000000..4dbf384
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+
+import java.util.*;
+
+/**
+ * The class is responsible for choosing the desired number of targets
+ * for placing block replicas.
+ * The strategy is that it tries its best to place the replicas to most racks.
+ */
+@InterfaceAudience.Private
+public class BlockPlacementPolicyRackFaultTolarent extends BlockPlacementPolicyDefault {
+
+ @Override
+ protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
+ int clusterSize = clusterMap.getNumOfLeaves();
+ int totalNumOfReplicas = numOfChosen + numOfReplicas;
+ if (totalNumOfReplicas > clusterSize) {
+ numOfReplicas -= (totalNumOfReplicas-clusterSize);
+ totalNumOfReplicas = clusterSize;
+ }
+ // No calculation needed when there is only one rack or picking one node.
+ int numOfRacks = clusterMap.getNumOfRacks();
+ if (numOfRacks == 1 || totalNumOfReplicas <= 1) {
+ return new int[] {numOfReplicas, totalNumOfReplicas};
+ }
+ if(totalNumOfReplicas<numOfRacks){
+ return new int[] {numOfReplicas, 1};
+ }
+ int maxNodesPerRack = (totalNumOfReplicas - 1) / numOfRacks + 1;
+ return new int[] {numOfReplicas, maxNodesPerRack};
+ }
+
+ /**
+ * Choose numOfReplicas in order:
+ * 1. If total replica expected is less than numOfRacks in cluster, it choose
+ * randomly.
+ * 2. If total replica expected is bigger than numOfRacks, it choose:
+ * 2a. Fill each rack exactly (maxNodesPerRack-1) replicas.
+ * 2b. For some random racks, place one more replica to each one of them, until
+ * numOfReplicas have been chosen. <br>
+ * In the end, the difference of the numbers of replicas for each two racks
+ * is no more than 1.
+ * Either way it always prefer local storage.
+ * @return local node of writer
+ */
+ @Override
+ protected Node chooseTargetInOrder(int numOfReplicas,
+ Node writer,
+ final Set<Node> excludedNodes,
+ final long blocksize,
+ final int maxNodesPerRack,
+ final List<DatanodeStorageInfo> results,
+ final boolean avoidStaleNodes,
+ final boolean newBlock,
+ EnumMap<StorageType, Integer> storageTypes)
+ throws NotEnoughReplicasException {
+ int totalReplicaExpected = results.size() + numOfReplicas;
+ int numOfRacks = clusterMap.getNumOfRacks();
+ if (totalReplicaExpected < numOfRacks ||
+ totalReplicaExpected % numOfRacks == 0) {
+ writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+ return writer;
+ }
+
+ assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks;
+
+ // Calculate numOfReplicas for filling each rack exactly (maxNodesPerRack-1)
+ // replicas.
+ HashMap<String, Integer> rackCounts = new HashMap<>();
+ for (DatanodeStorageInfo dsInfo : results) {
+ String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation();
+ Integer count = rackCounts.get(rack);
+ if (count != null) {
+ rackCounts.put(rack, count + 1);
+ } else {
+ rackCounts.put(rack, 1);
+ }
+ }
+ int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in results
+ for (int count : rackCounts.values()) {
+ if (count > maxNodesPerRack -1) {
+ excess += count - (maxNodesPerRack -1);
+ }
+ }
+ numOfReplicas = Math.min(totalReplicaExpected - results.size(),
+ (maxNodesPerRack -1) * numOfRacks - (results.size() - excess));
+
+ // Fill each rack exactly (maxNodesPerRack-1) replicas.
+ writer = chooseOnce(numOfReplicas, writer, new HashSet<>(excludedNodes),
+ blocksize, maxNodesPerRack -1, results, avoidStaleNodes, storageTypes);
+
+ for (DatanodeStorageInfo resultStorage : results) {
+ addToExcludedNodes(resultStorage.getDatanodeDescriptor(), excludedNodes);
+ }
+
+ // For some racks, place one more replica to each one of them.
+ numOfReplicas = totalReplicaExpected - results.size();
+ chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+
+ return writer;
+ }
+
+ /**
+ * Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
+ * Except that 1st replica prefer local storage.
+ * @return local node of writer.
+ */
+ private Node chooseOnce(int numOfReplicas,
+ Node writer,
+ final Set<Node> excludedNodes,
+ final long blocksize,
+ final int maxNodesPerRack,
+ final List<DatanodeStorageInfo> results,
+ final boolean avoidStaleNodes,
+ EnumMap<StorageType, Integer> storageTypes)
+ throws NotEnoughReplicasException {
+ if (numOfReplicas == 0) {
+ return writer;
+ }
+ writer = chooseLocalStorage(writer, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
+ .getDatanodeDescriptor();
+ if (--numOfReplicas == 0) {
+ return writer;
+ }
+ chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+ return writer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9595cc00/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java
new file mode 100644
index 0000000..d86a267
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolarent;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.net.StaticMapping;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockPlacementPolicyRackFaultTolarent {
+
+ private static final int DEFAULT_BLOCK_SIZE = 1024;
+ private MiniDFSCluster cluster = null;
+ private NamenodeProtocols nameNodeRpc = null;
+ private FSNamesystem namesystem = null;
+ private PermissionStatus perm = null;
+
+ @Before
+ public void setup() throws IOException {
+ StaticMapping.resetMap();
+ Configuration conf = new HdfsConfiguration();
+ final ArrayList<String> rackList = new ArrayList<String>();
+ final ArrayList<String> hostList = new ArrayList<String>();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 2; j++) {
+ rackList.add("/rack" + i);
+ hostList.add("/host" + i + j);
+ }
+ }
+ conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+ BlockPlacementPolicyRackFaultTolarent.class,
+ BlockPlacementPolicy.class);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2);
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(hostList.size())
+ .racks(rackList.toArray(new String[rackList.size()]))
+ .hosts(hostList.toArray(new String[hostList.size()]))
+ .build();
+ cluster.waitActive();
+ nameNodeRpc = cluster.getNameNodeRpc();
+ namesystem = cluster.getNamesystem();
+ perm = new PermissionStatus("TestBlockPlacementPolicyEC", null,
+ FsPermission.getDefault());
+ }
+
+ @After
+ public void teardown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testChooseTarget() throws Exception {
+ doTestChooseTargetNormalCase();
+ doTestChooseTargetSpecialCase();
+ }
+
+ private void doTestChooseTargetNormalCase() throws Exception {
+ String clientMachine = "client.foo.com";
+ short[][] testSuite = {
+ {3, 2}, {3, 7}, {3, 8}, {3, 10}, {9, 1}, {10, 1}, {10, 6}, {11, 6},
+ {11, 9}
+ };
+ // Test 5 files
+ int fileCount = 0;
+ for (int i = 0; i < 5; i++) {
+ for (short[] testCase : testSuite) {
+ short replication = testCase[0];
+ short additionalReplication = testCase[1];
+ String src = "/testfile" + (fileCount++);
+ // Create the file with client machine
+ HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
+ clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
+ replication, DEFAULT_BLOCK_SIZE, null, false);
+
+ //test chooseTarget for new file
+ LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
+ null, null, fileStatus.getFileId(), null);
+ doTestLocatedBlock(replication, locatedBlock);
+
+ //test chooseTarget for existing file.
+ LocatedBlock additionalLocatedBlock =
+ nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(),
+ locatedBlock.getBlock(), locatedBlock.getLocations(),
+ locatedBlock.getStorageIDs(), new DatanodeInfo[0],
+ additionalReplication, clientMachine);
+ doTestLocatedBlock(replication + additionalReplication, additionalLocatedBlock);
+ }
+ }
+ }
+
+ /**
+ * Test more randomly. So it covers some special cases.
+ * Like when some racks already have 2 replicas, while some racks have none,
+ * we should choose the racks that have none.
+ */
+ private void doTestChooseTargetSpecialCase() throws Exception {
+ String clientMachine = "client.foo.com";
+ // Test 5 files
+ String src = "/testfile_1_";
+ // Create the file with client machine
+ HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
+ clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
+ (short) 20, DEFAULT_BLOCK_SIZE, null, false);
+
+ //test chooseTarget for new file
+ LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
+ null, null, fileStatus.getFileId(), null);
+ doTestLocatedBlock(20, locatedBlock);
+
+ DatanodeInfo[] locs = locatedBlock.getLocations();
+ String[] storageIDs = locatedBlock.getStorageIDs();
+
+ for (int time = 0; time < 5; time++) {
+ shuffle(locs, storageIDs);
+ for (int i = 1; i < locs.length; i++) {
+ DatanodeInfo[] partLocs = new DatanodeInfo[i];
+ String[] partStorageIDs = new String[i];
+ System.arraycopy(locs, 0, partLocs, 0, i);
+ System.arraycopy(storageIDs, 0, partStorageIDs, 0, i);
+ for (int j = 1; j < 20 - i; j++) {
+ LocatedBlock additionalLocatedBlock =
+ nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(),
+ locatedBlock.getBlock(), partLocs,
+ partStorageIDs, new DatanodeInfo[0],
+ j, clientMachine);
+ doTestLocatedBlock(i + j, additionalLocatedBlock);
+ }
+ }
+ }
+ }
+
+ private void shuffle(DatanodeInfo[] locs, String[] storageIDs) {
+ int length = locs.length;
+ Object[][] pairs = new Object[length][];
+ for (int i = 0; i < length; i++) {
+ pairs[i] = new Object[]{locs[i], storageIDs[i]};
+ }
+ DFSUtil.shuffle(pairs);
+ for (int i = 0; i < length; i++) {
+ locs[i] = (DatanodeInfo) pairs[i][0];
+ storageIDs[i] = (String) pairs[i][1];
+ }
+ }
+
+ private void doTestLocatedBlock(int replication, LocatedBlock locatedBlock) {
+ assertEquals(replication, locatedBlock.getLocations().length);
+
+ HashMap<String, Integer> racksCount = new HashMap<String, Integer>();
+ for (DatanodeInfo node :
+ locatedBlock.getLocations()) {
+ addToRacksCount(node.getNetworkLocation(), racksCount);
+ }
+
+ int minCount = Integer.MAX_VALUE;
+ int maxCount = Integer.MIN_VALUE;
+ for (Integer rackCount : racksCount.values()) {
+ minCount = Math.min(minCount, rackCount);
+ maxCount = Math.max(maxCount, rackCount);
+ }
+ assertTrue(maxCount - minCount <= 1);
+ }
+
+ private void addToRacksCount(String rack, HashMap<String, Integer> racksCount) {
+ Integer count = racksCount.get(rack);
+ if (count == null) {
+ racksCount.put(rack, 1);
+ } else {
+ racksCount.put(rack, count + 1);
+ }
+ }
+}