You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/06/03 00:48:36 UTC
hadoop git commit: HDFS-8513. Rename
BlockPlacementPolicyRackFaultTolarent to
BlockPlacementPolicyRackFaultTolerant. (wang)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 379ece15f -> a80a68c29
HDFS-8513. Rename BlockPlacementPolicyRackFaultTolarent to BlockPlacementPolicyRackFaultTolerant. (wang)
(cherry picked from commit c1d50a91f7c05e4aaf4655380c8dcd11703ff158)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a80a68c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a80a68c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a80a68c2
Branch: refs/heads/branch-2
Commit: a80a68c298f2def05bb08d087c0934eee11d6eb4
Parents: 379ece1
Author: Andrew Wang <wa...@apache.org>
Authored: Tue Jun 2 15:48:26 2015 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Tue Jun 2 15:48:31 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../BlockPlacementPolicyRackFaultTolarent.java | 154 --------------
.../BlockPlacementPolicyRackFaultTolerant.java | 154 ++++++++++++++
...stBlockPlacementPolicyRackFaultTolarent.java | 209 -------------------
...stBlockPlacementPolicyRackFaultTolerant.java | 209 +++++++++++++++++++
5 files changed, 366 insertions(+), 363 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a80a68c2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index cad0e86..ba4b250 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -254,6 +254,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8386. Improve synchronization of 'streamer' reference in
DFSOutputStream. (Rakesh R via wang)
+ HDFS-8513. Rename BlockPlacementPolicyRackFaultTolarent to
+ BlockPlacementPolicyRackFaultTolerant. (wang)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a80a68c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java
deleted file mode 100644
index 4dbf384..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
-
-import java.util.*;
-
-/**
- * The class is responsible for choosing the desired number of targets
- * for placing block replicas.
- * The strategy is that it tries its best to place the replicas to most racks.
- */
-@InterfaceAudience.Private
-public class BlockPlacementPolicyRackFaultTolarent extends BlockPlacementPolicyDefault {
-
- @Override
- protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
- int clusterSize = clusterMap.getNumOfLeaves();
- int totalNumOfReplicas = numOfChosen + numOfReplicas;
- if (totalNumOfReplicas > clusterSize) {
- numOfReplicas -= (totalNumOfReplicas-clusterSize);
- totalNumOfReplicas = clusterSize;
- }
- // No calculation needed when there is only one rack or picking one node.
- int numOfRacks = clusterMap.getNumOfRacks();
- if (numOfRacks == 1 || totalNumOfReplicas <= 1) {
- return new int[] {numOfReplicas, totalNumOfReplicas};
- }
- if(totalNumOfReplicas<numOfRacks){
- return new int[] {numOfReplicas, 1};
- }
- int maxNodesPerRack = (totalNumOfReplicas - 1) / numOfRacks + 1;
- return new int[] {numOfReplicas, maxNodesPerRack};
- }
-
- /**
- * Choose numOfReplicas in order:
- * 1. If total replica expected is less than numOfRacks in cluster, it choose
- * randomly.
- * 2. If total replica expected is bigger than numOfRacks, it choose:
- * 2a. Fill each rack exactly (maxNodesPerRack-1) replicas.
- * 2b. For some random racks, place one more replica to each one of them, until
- * numOfReplicas have been chosen. <br>
- * In the end, the difference of the numbers of replicas for each two racks
- * is no more than 1.
- * Either way it always prefer local storage.
- * @return local node of writer
- */
- @Override
- protected Node chooseTargetInOrder(int numOfReplicas,
- Node writer,
- final Set<Node> excludedNodes,
- final long blocksize,
- final int maxNodesPerRack,
- final List<DatanodeStorageInfo> results,
- final boolean avoidStaleNodes,
- final boolean newBlock,
- EnumMap<StorageType, Integer> storageTypes)
- throws NotEnoughReplicasException {
- int totalReplicaExpected = results.size() + numOfReplicas;
- int numOfRacks = clusterMap.getNumOfRacks();
- if (totalReplicaExpected < numOfRacks ||
- totalReplicaExpected % numOfRacks == 0) {
- writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes, storageTypes);
- return writer;
- }
-
- assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks;
-
- // Calculate numOfReplicas for filling each rack exactly (maxNodesPerRack-1)
- // replicas.
- HashMap<String, Integer> rackCounts = new HashMap<>();
- for (DatanodeStorageInfo dsInfo : results) {
- String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation();
- Integer count = rackCounts.get(rack);
- if (count != null) {
- rackCounts.put(rack, count + 1);
- } else {
- rackCounts.put(rack, 1);
- }
- }
- int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in results
- for (int count : rackCounts.values()) {
- if (count > maxNodesPerRack -1) {
- excess += count - (maxNodesPerRack -1);
- }
- }
- numOfReplicas = Math.min(totalReplicaExpected - results.size(),
- (maxNodesPerRack -1) * numOfRacks - (results.size() - excess));
-
- // Fill each rack exactly (maxNodesPerRack-1) replicas.
- writer = chooseOnce(numOfReplicas, writer, new HashSet<>(excludedNodes),
- blocksize, maxNodesPerRack -1, results, avoidStaleNodes, storageTypes);
-
- for (DatanodeStorageInfo resultStorage : results) {
- addToExcludedNodes(resultStorage.getDatanodeDescriptor(), excludedNodes);
- }
-
- // For some racks, place one more replica to each one of them.
- numOfReplicas = totalReplicaExpected - results.size();
- chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes, storageTypes);
-
- return writer;
- }
-
- /**
- * Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
- * Except that 1st replica prefer local storage.
- * @return local node of writer.
- */
- private Node chooseOnce(int numOfReplicas,
- Node writer,
- final Set<Node> excludedNodes,
- final long blocksize,
- final int maxNodesPerRack,
- final List<DatanodeStorageInfo> results,
- final boolean avoidStaleNodes,
- EnumMap<StorageType, Integer> storageTypes)
- throws NotEnoughReplicasException {
- if (numOfReplicas == 0) {
- return writer;
- }
- writer = chooseLocalStorage(writer, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
- .getDatanodeDescriptor();
- if (--numOfReplicas == 0) {
- return writer;
- }
- chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes, storageTypes);
- return writer;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a80a68c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
new file mode 100644
index 0000000..f25fb15
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+
+import java.util.*;
+
+/**
+ * The class is responsible for choosing the desired number of targets
+ * for placing block replicas.
+ * The strategy is that it tries its best to place the replicas to most racks.
+ */
+@InterfaceAudience.Private
+public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyDefault {
+
+ @Override
+ protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
+ int clusterSize = clusterMap.getNumOfLeaves();
+ int totalNumOfReplicas = numOfChosen + numOfReplicas;
+ if (totalNumOfReplicas > clusterSize) {
+ numOfReplicas -= (totalNumOfReplicas-clusterSize);
+ totalNumOfReplicas = clusterSize;
+ }
+ // No calculation needed when there is only one rack or picking one node.
+ int numOfRacks = clusterMap.getNumOfRacks();
+ if (numOfRacks == 1 || totalNumOfReplicas <= 1) {
+ return new int[] {numOfReplicas, totalNumOfReplicas};
+ }
+ if(totalNumOfReplicas<numOfRacks){
+ return new int[] {numOfReplicas, 1};
+ }
+ int maxNodesPerRack = (totalNumOfReplicas - 1) / numOfRacks + 1;
+ return new int[] {numOfReplicas, maxNodesPerRack};
+ }
+
+ /**
+ * Choose numOfReplicas in order:
+ * 1. If total replica expected is less than numOfRacks in cluster, it choose
+ * randomly.
+ * 2. If total replica expected is bigger than numOfRacks, it choose:
+ * 2a. Fill each rack exactly (maxNodesPerRack-1) replicas.
+ * 2b. For some random racks, place one more replica to each one of them, until
+ * numOfReplicas have been chosen. <br>
+ * In the end, the difference of the numbers of replicas for each two racks
+ * is no more than 1.
+ * Either way it always prefer local storage.
+ * @return local node of writer
+ */
+ @Override
+ protected Node chooseTargetInOrder(int numOfReplicas,
+ Node writer,
+ final Set<Node> excludedNodes,
+ final long blocksize,
+ final int maxNodesPerRack,
+ final List<DatanodeStorageInfo> results,
+ final boolean avoidStaleNodes,
+ final boolean newBlock,
+ EnumMap<StorageType, Integer> storageTypes)
+ throws NotEnoughReplicasException {
+ int totalReplicaExpected = results.size() + numOfReplicas;
+ int numOfRacks = clusterMap.getNumOfRacks();
+ if (totalReplicaExpected < numOfRacks ||
+ totalReplicaExpected % numOfRacks == 0) {
+ writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+ return writer;
+ }
+
+ assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks;
+
+ // Calculate numOfReplicas for filling each rack exactly (maxNodesPerRack-1)
+ // replicas.
+ HashMap<String, Integer> rackCounts = new HashMap<>();
+ for (DatanodeStorageInfo dsInfo : results) {
+ String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation();
+ Integer count = rackCounts.get(rack);
+ if (count != null) {
+ rackCounts.put(rack, count + 1);
+ } else {
+ rackCounts.put(rack, 1);
+ }
+ }
+ int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in results
+ for (int count : rackCounts.values()) {
+ if (count > maxNodesPerRack -1) {
+ excess += count - (maxNodesPerRack -1);
+ }
+ }
+ numOfReplicas = Math.min(totalReplicaExpected - results.size(),
+ (maxNodesPerRack -1) * numOfRacks - (results.size() - excess));
+
+ // Fill each rack exactly (maxNodesPerRack-1) replicas.
+ writer = chooseOnce(numOfReplicas, writer, new HashSet<>(excludedNodes),
+ blocksize, maxNodesPerRack -1, results, avoidStaleNodes, storageTypes);
+
+ for (DatanodeStorageInfo resultStorage : results) {
+ addToExcludedNodes(resultStorage.getDatanodeDescriptor(), excludedNodes);
+ }
+
+ // For some racks, place one more replica to each one of them.
+ numOfReplicas = totalReplicaExpected - results.size();
+ chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+
+ return writer;
+ }
+
+ /**
+ * Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
+ * Except that 1st replica prefer local storage.
+ * @return local node of writer.
+ */
+ private Node chooseOnce(int numOfReplicas,
+ Node writer,
+ final Set<Node> excludedNodes,
+ final long blocksize,
+ final int maxNodesPerRack,
+ final List<DatanodeStorageInfo> results,
+ final boolean avoidStaleNodes,
+ EnumMap<StorageType, Integer> storageTypes)
+ throws NotEnoughReplicasException {
+ if (numOfReplicas == 0) {
+ return writer;
+ }
+ writer = chooseLocalStorage(writer, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
+ .getDatanodeDescriptor();
+ if (--numOfReplicas == 0) {
+ return writer;
+ }
+ chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+ return writer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a80a68c2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java
deleted file mode 100644
index d86a267..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolarent;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
-import org.apache.hadoop.net.StaticMapping;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.*;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestBlockPlacementPolicyRackFaultTolarent {
-
- private static final int DEFAULT_BLOCK_SIZE = 1024;
- private MiniDFSCluster cluster = null;
- private NamenodeProtocols nameNodeRpc = null;
- private FSNamesystem namesystem = null;
- private PermissionStatus perm = null;
-
- @Before
- public void setup() throws IOException {
- StaticMapping.resetMap();
- Configuration conf = new HdfsConfiguration();
- final ArrayList<String> rackList = new ArrayList<String>();
- final ArrayList<String> hostList = new ArrayList<String>();
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < 2; j++) {
- rackList.add("/rack" + i);
- hostList.add("/host" + i + j);
- }
- }
- conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
- BlockPlacementPolicyRackFaultTolarent.class,
- BlockPlacementPolicy.class);
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
- conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2);
- cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(hostList.size())
- .racks(rackList.toArray(new String[rackList.size()]))
- .hosts(hostList.toArray(new String[hostList.size()]))
- .build();
- cluster.waitActive();
- nameNodeRpc = cluster.getNameNodeRpc();
- namesystem = cluster.getNamesystem();
- perm = new PermissionStatus("TestBlockPlacementPolicyEC", null,
- FsPermission.getDefault());
- }
-
- @After
- public void teardown() {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- @Test
- public void testChooseTarget() throws Exception {
- doTestChooseTargetNormalCase();
- doTestChooseTargetSpecialCase();
- }
-
- private void doTestChooseTargetNormalCase() throws Exception {
- String clientMachine = "client.foo.com";
- short[][] testSuite = {
- {3, 2}, {3, 7}, {3, 8}, {3, 10}, {9, 1}, {10, 1}, {10, 6}, {11, 6},
- {11, 9}
- };
- // Test 5 files
- int fileCount = 0;
- for (int i = 0; i < 5; i++) {
- for (short[] testCase : testSuite) {
- short replication = testCase[0];
- short additionalReplication = testCase[1];
- String src = "/testfile" + (fileCount++);
- // Create the file with client machine
- HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
- clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
- replication, DEFAULT_BLOCK_SIZE, null, false);
-
- //test chooseTarget for new file
- LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
- null, null, fileStatus.getFileId(), null);
- doTestLocatedBlock(replication, locatedBlock);
-
- //test chooseTarget for existing file.
- LocatedBlock additionalLocatedBlock =
- nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(),
- locatedBlock.getBlock(), locatedBlock.getLocations(),
- locatedBlock.getStorageIDs(), new DatanodeInfo[0],
- additionalReplication, clientMachine);
- doTestLocatedBlock(replication + additionalReplication, additionalLocatedBlock);
- }
- }
- }
-
- /**
- * Test more randomly. So it covers some special cases.
- * Like when some racks already have 2 replicas, while some racks have none,
- * we should choose the racks that have none.
- */
- private void doTestChooseTargetSpecialCase() throws Exception {
- String clientMachine = "client.foo.com";
- // Test 5 files
- String src = "/testfile_1_";
- // Create the file with client machine
- HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
- clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
- (short) 20, DEFAULT_BLOCK_SIZE, null, false);
-
- //test chooseTarget for new file
- LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
- null, null, fileStatus.getFileId(), null);
- doTestLocatedBlock(20, locatedBlock);
-
- DatanodeInfo[] locs = locatedBlock.getLocations();
- String[] storageIDs = locatedBlock.getStorageIDs();
-
- for (int time = 0; time < 5; time++) {
- shuffle(locs, storageIDs);
- for (int i = 1; i < locs.length; i++) {
- DatanodeInfo[] partLocs = new DatanodeInfo[i];
- String[] partStorageIDs = new String[i];
- System.arraycopy(locs, 0, partLocs, 0, i);
- System.arraycopy(storageIDs, 0, partStorageIDs, 0, i);
- for (int j = 1; j < 20 - i; j++) {
- LocatedBlock additionalLocatedBlock =
- nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(),
- locatedBlock.getBlock(), partLocs,
- partStorageIDs, new DatanodeInfo[0],
- j, clientMachine);
- doTestLocatedBlock(i + j, additionalLocatedBlock);
- }
- }
- }
- }
-
- private void shuffle(DatanodeInfo[] locs, String[] storageIDs) {
- int length = locs.length;
- Object[][] pairs = new Object[length][];
- for (int i = 0; i < length; i++) {
- pairs[i] = new Object[]{locs[i], storageIDs[i]};
- }
- DFSUtil.shuffle(pairs);
- for (int i = 0; i < length; i++) {
- locs[i] = (DatanodeInfo) pairs[i][0];
- storageIDs[i] = (String) pairs[i][1];
- }
- }
-
- private void doTestLocatedBlock(int replication, LocatedBlock locatedBlock) {
- assertEquals(replication, locatedBlock.getLocations().length);
-
- HashMap<String, Integer> racksCount = new HashMap<String, Integer>();
- for (DatanodeInfo node :
- locatedBlock.getLocations()) {
- addToRacksCount(node.getNetworkLocation(), racksCount);
- }
-
- int minCount = Integer.MAX_VALUE;
- int maxCount = Integer.MIN_VALUE;
- for (Integer rackCount : racksCount.values()) {
- minCount = Math.min(minCount, rackCount);
- maxCount = Math.max(maxCount, rackCount);
- }
- assertTrue(maxCount - minCount <= 1);
- }
-
- private void addToRacksCount(String rack, HashMap<String, Integer> racksCount) {
- Integer count = racksCount.get(rack);
- if (count == null) {
- racksCount.put(rack, 1);
- } else {
- racksCount.put(rack, count + 1);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a80a68c2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java
new file mode 100644
index 0000000..ca9da77
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.net.StaticMapping;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockPlacementPolicyRackFaultTolerant {
+
+ private static final int DEFAULT_BLOCK_SIZE = 1024;
+ private MiniDFSCluster cluster = null;
+ private NamenodeProtocols nameNodeRpc = null;
+ private FSNamesystem namesystem = null;
+ private PermissionStatus perm = null;
+
+ @Before
+ public void setup() throws IOException {
+ StaticMapping.resetMap();
+ Configuration conf = new HdfsConfiguration();
+ final ArrayList<String> rackList = new ArrayList<String>();
+ final ArrayList<String> hostList = new ArrayList<String>();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 2; j++) {
+ rackList.add("/rack" + i);
+ hostList.add("/host" + i + j);
+ }
+ }
+ conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+ BlockPlacementPolicyRackFaultTolerant.class,
+ BlockPlacementPolicy.class);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2);
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(hostList.size())
+ .racks(rackList.toArray(new String[rackList.size()]))
+ .hosts(hostList.toArray(new String[hostList.size()]))
+ .build();
+ cluster.waitActive();
+ nameNodeRpc = cluster.getNameNodeRpc();
+ namesystem = cluster.getNamesystem();
+ perm = new PermissionStatus("TestBlockPlacementPolicyEC", null,
+ FsPermission.getDefault());
+ }
+
+ @After
+ public void teardown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testChooseTarget() throws Exception {
+ doTestChooseTargetNormalCase();
+ doTestChooseTargetSpecialCase();
+ }
+
+ private void doTestChooseTargetNormalCase() throws Exception {
+ String clientMachine = "client.foo.com";
+ short[][] testSuite = {
+ {3, 2}, {3, 7}, {3, 8}, {3, 10}, {9, 1}, {10, 1}, {10, 6}, {11, 6},
+ {11, 9}
+ };
+ // Test 5 files
+ int fileCount = 0;
+ for (int i = 0; i < 5; i++) {
+ for (short[] testCase : testSuite) {
+ short replication = testCase[0];
+ short additionalReplication = testCase[1];
+ String src = "/testfile" + (fileCount++);
+ // Create the file with client machine
+ HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
+ clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
+ replication, DEFAULT_BLOCK_SIZE, null, false);
+
+ //test chooseTarget for new file
+ LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
+ null, null, fileStatus.getFileId(), null);
+ doTestLocatedBlock(replication, locatedBlock);
+
+ //test chooseTarget for existing file.
+ LocatedBlock additionalLocatedBlock =
+ nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(),
+ locatedBlock.getBlock(), locatedBlock.getLocations(),
+ locatedBlock.getStorageIDs(), new DatanodeInfo[0],
+ additionalReplication, clientMachine);
+ doTestLocatedBlock(replication + additionalReplication, additionalLocatedBlock);
+ }
+ }
+ }
+
+ /**
+ * Test more randomly. So it covers some special cases.
+ * Like when some racks already have 2 replicas, while some racks have none,
+ * we should choose the racks that have none.
+ */
+ private void doTestChooseTargetSpecialCase() throws Exception {
+ String clientMachine = "client.foo.com";
+ // Test 5 files
+ String src = "/testfile_1_";
+ // Create the file with client machine
+ HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
+ clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
+ (short) 20, DEFAULT_BLOCK_SIZE, null, false);
+
+ //test chooseTarget for new file
+ LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
+ null, null, fileStatus.getFileId(), null);
+ doTestLocatedBlock(20, locatedBlock);
+
+ DatanodeInfo[] locs = locatedBlock.getLocations();
+ String[] storageIDs = locatedBlock.getStorageIDs();
+
+ for (int time = 0; time < 5; time++) {
+ shuffle(locs, storageIDs);
+ for (int i = 1; i < locs.length; i++) {
+ DatanodeInfo[] partLocs = new DatanodeInfo[i];
+ String[] partStorageIDs = new String[i];
+ System.arraycopy(locs, 0, partLocs, 0, i);
+ System.arraycopy(storageIDs, 0, partStorageIDs, 0, i);
+ for (int j = 1; j < 20 - i; j++) {
+ LocatedBlock additionalLocatedBlock =
+ nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(),
+ locatedBlock.getBlock(), partLocs,
+ partStorageIDs, new DatanodeInfo[0],
+ j, clientMachine);
+ doTestLocatedBlock(i + j, additionalLocatedBlock);
+ }
+ }
+ }
+ }
+
+ private void shuffle(DatanodeInfo[] locs, String[] storageIDs) {
+ int length = locs.length;
+ Object[][] pairs = new Object[length][];
+ for (int i = 0; i < length; i++) {
+ pairs[i] = new Object[]{locs[i], storageIDs[i]};
+ }
+ DFSUtil.shuffle(pairs);
+ for (int i = 0; i < length; i++) {
+ locs[i] = (DatanodeInfo) pairs[i][0];
+ storageIDs[i] = (String) pairs[i][1];
+ }
+ }
+
+ private void doTestLocatedBlock(int replication, LocatedBlock locatedBlock) {
+ assertEquals(replication, locatedBlock.getLocations().length);
+
+ HashMap<String, Integer> racksCount = new HashMap<String, Integer>();
+ for (DatanodeInfo node :
+ locatedBlock.getLocations()) {
+ addToRacksCount(node.getNetworkLocation(), racksCount);
+ }
+
+ int minCount = Integer.MAX_VALUE;
+ int maxCount = Integer.MIN_VALUE;
+ for (Integer rackCount : racksCount.values()) {
+ minCount = Math.min(minCount, rackCount);
+ maxCount = Math.max(maxCount, rackCount);
+ }
+ assertTrue(maxCount - minCount <= 1);
+ }
+
+ private void addToRacksCount(String rack, HashMap<String, Integer> racksCount) {
+ Integer count = racksCount.get(rack);
+ if (count == null) {
+ racksCount.put(rack, 1);
+ } else {
+ racksCount.put(rack, count + 1);
+ }
+ }
+}