You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/08/31 01:45:55 UTC
svn commit: r809439 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Author: hairong
Date: Sun Aug 30 23:45:55 2009
New Revision: 809439
URL: http://svn.apache.org/viewvc?rev=809439&view=rev
Log:
HDFS-15. All replicas end up on 1 rack. Contributed by Jitendra Nath Pandey.
Added:
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=809439&r1=809438&r2=809439&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Sun Aug 30 23:45:55 2009
@@ -182,6 +182,8 @@
src/test/mapred-site.xml for mapreduce tests to run. (Amareshwari
Sriramadasu via szetszwo)
+ HDFS-15. All replicas end up on 1 rack. (Jitendra Nath Pandey via hairong)
+
Release 0.20.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=809439&r1=809438&r2=809439&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Sun Aug 30 23:45:55 2009
@@ -105,6 +105,9 @@
// Default number of replicas
int defaultReplication;
+ // variable to enable check for enough racks
+ boolean shouldCheckForEnoughRacks = true;
+
/**
* Last block index used for replication work.
*/
@@ -155,10 +158,13 @@
+ " must be less than dfs.replication.max = "
+ maxReplication);
this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
+ this.shouldCheckForEnoughRacks = conf.get("topology.script.file.name") == null ? false
+ : true;
FSNamesystem.LOG.info("defaultReplication = " + defaultReplication);
FSNamesystem.LOG.info("maxReplication = " + maxReplication);
FSNamesystem.LOG.info("minReplication = " + minReplication);
FSNamesystem.LOG.info("maxReplicationStreams = " + maxReplicationStreams);
+ FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
}
void activate() {
@@ -613,6 +619,7 @@
int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes;
DatanodeDescriptor srcNode;
+ int additionalReplRequired;
synchronized (namesystem) {
synchronized (neededReplications) {
@@ -624,6 +631,7 @@
replIndex--;
return false;
}
+
requiredReplication = fileINode.getReplication();
// get a source data-node
@@ -640,21 +648,32 @@
// do not schedule more if enough replicas is already pending
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
- if(numEffectiveReplicas >= requiredReplication) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- NameNode.stateChangeLog.info("BLOCK* "
- + "Removing block " + block
- + " from neededReplications as it has enough replicas.");
- return false;
+
+ if (numEffectiveReplicas >= requiredReplication) {
+ if ( (pendingReplications.getNumReplicas(block) > 0) ||
+ (blockHasEnoughRacks(block)) ) {
+ neededReplications.remove(block, priority); // remove from neededReplications
+ replIndex--;
+ NameNode.stateChangeLog.info("BLOCK* "
+ + "Removing block " + block
+ + " from neededReplications as it has enough replicas.");
+ return false;
+ }
}
+
+ if (numReplicas.liveReplicas() < requiredReplication) {
+ additionalReplRequired = requiredReplication - numEffectiveReplicas;
+ } else {
+ additionalReplRequired = 1; //Needed on a new rack
+ }
+
}
}
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
- DatanodeDescriptor targets[] = replicator.chooseTarget(
- requiredReplication - numEffectiveReplicas,
- srcNode, containingNodes, null, block.getNumBytes());
+ DatanodeDescriptor targets[] =
+ replicator.chooseTarget(additionalReplRequired,
+ srcNode, containingNodes, null, block.getNumBytes());
if(targets.length == 0)
return false;
@@ -675,13 +694,25 @@
NumberReplicas numReplicas = countNodes(block);
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
- if(numEffectiveReplicas >= requiredReplication) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- NameNode.stateChangeLog.info("BLOCK* "
- + "Removing block " + block
- + " from neededReplications as it has enough replicas.");
- return false;
+
+ if (numEffectiveReplicas >= requiredReplication) {
+ if ( (pendingReplications.getNumReplicas(block) > 0) ||
+ (blockHasEnoughRacks(block)) ) {
+ neededReplications.remove(block, priority); // remove from neededReplications
+ replIndex--;
+ NameNode.stateChangeLog.info("BLOCK* "
+ + "Removing block " + block
+ + " from neededReplications as it has enough replicas.");
+ return false;
+ }
+ }
+
+ if ( (numReplicas.liveReplicas() >= requiredReplication) &&
+ (!blockHasEnoughRacks(block)) ) {
+ if (srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
+ //No use continuing, unless a new rack in this case
+ return false;
+ }
}
// Add block to the to be replicated list
@@ -803,10 +834,13 @@
synchronized (namesystem) {
for (int i = 0; i < timedOutItems.length; i++) {
NumberReplicas num = countNodes(timedOutItems[i]);
- neededReplications.add(timedOutItems[i],
- num.liveReplicas(),
- num.decommissionedReplicas(),
- getReplication(timedOutItems[i]));
+ if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]),
+ num.liveReplicas())) {
+ neededReplications.add(timedOutItems[i],
+ num.liveReplicas(),
+ num.decommissionedReplicas(),
+ getReplication(timedOutItems[i]));
+ }
}
}
/* If we know the target datanodes where the replication timedout,
@@ -1058,9 +1092,11 @@
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be
- if (neededReplications.add(block, numCurrentReplica, num
- .decommissionedReplicas(), expectedReplication)) {
- nrUnderReplicated++;
+ if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
+ if (neededReplications.add(block, numCurrentReplica, num
+ .decommissionedReplicas(), expectedReplication)) {
+ nrUnderReplicated++;
+ }
}
if (numCurrentReplica > expectedReplication) {
@@ -1239,8 +1275,11 @@
NumberReplicas num = countNodes(block);
int curReplicas = num.liveReplicas();
int curExpectedReplicas = getReplication(block);
- if (curExpectedReplicas > curReplicas) {
- status = true;
+ if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
+ if (curExpectedReplicas > curReplicas) {
+ //Set to true only if strictly under-replicated
+ status = true;
+ }
if (!neededReplications.contains(block) &&
pendingReplications.getNumReplicas(block) == 0) {
//
@@ -1293,16 +1332,23 @@
synchronized (namesystem) {
NumberReplicas repl = countNodes(block);
int curExpectedReplicas = getReplication(block);
- neededReplications.update(block, repl.liveReplicas(), repl
- .decommissionedReplicas(), curExpectedReplicas, curReplicasDelta,
- expectedReplicasDelta);
+ if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
+ neededReplications.update(block, repl.liveReplicas(), repl
+ .decommissionedReplicas(), curExpectedReplicas, curReplicasDelta,
+ expectedReplicasDelta);
+ } else {
+ int oldReplicas = repl.liveReplicas()-curReplicasDelta;
+ int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
+ neededReplications.remove(block, oldReplicas, repl.decommissionedReplicas(),
+ oldExpectedReplicas);
+ }
}
}
void checkReplication(Block block, int numExpectedReplicas) {
// filter out containingNodes that are marked for decommission.
NumberReplicas number = countNodes(block);
- if (number.liveReplicas() < numExpectedReplicas) {
+ if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) {
neededReplications.add(block,
number.liveReplicas(),
number.decommissionedReplicas,
@@ -1384,7 +1430,68 @@
return blocksToInvalidate.size();
}
}
+
+ //Returns the number of racks over which a given block is replicated
+ //decommissioning/decommissioned nodes are not counted. corrupt replicas
+ //are also ignored
+ int getNumberOfRacks(Block b) {
+ HashSet<String> rackSet = new HashSet<String>(0);
+ Collection<DatanodeDescriptor> corruptNodes =
+ corruptReplicas.getNodes(b);
+ for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
+ it.hasNext();) {
+ DatanodeDescriptor cur = it.next();
+ if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+ if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
+ String rackName = cur.getNetworkLocation();
+ if (!rackSet.contains(rackName)) {
+ rackSet.add(rackName);
+ }
+ }
+ }
+ }
+ return rackSet.size();
+ }
+ boolean blockHasEnoughRacks(Block b) {
+ if (!this.shouldCheckForEnoughRacks) {
+ return true;
+ }
+ boolean enoughRacks = false;;
+ Collection<DatanodeDescriptor> corruptNodes =
+ corruptReplicas.getNodes(b);
+ int numExpectedReplicas = getReplication(b);
+ String rackName = null;
+ for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
+ it.hasNext();) {
+ DatanodeDescriptor cur = it.next();
+ if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+ if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
+ if (numExpectedReplicas == 1) {
+ enoughRacks = true;
+ break;
+ }
+ String rackNameNew = cur.getNetworkLocation();
+ if (rackName == null) {
+ rackName = rackNameNew;
+ } else if (!rackName.equals(rackNameNew)) {
+ enoughRacks = true;
+ break;
+ }
+ }
+ }
+ }
+ return enoughRacks;
+ }
+
+ boolean isNeededReplication(Block b, int expectedReplication, int curReplicas) {
+ if ((curReplicas >= expectedReplication) && (blockHasEnoughRacks(b))) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
long getMissingBlocksCount() {
// not locking
return Math.max(missingBlocksInPrevIter, missingBlocksInCurIter);
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java?rev=809439&r1=809438&r2=809439&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java Sun Aug 30 23:45:55 2009
@@ -26,7 +26,7 @@
* Blocks have only one replicas has the highest
*/
class UnderReplicatedBlocks implements Iterable<Block> {
- static final int LEVEL = 3;
+ static final int LEVEL = 4;
private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
/* constructor */
@@ -53,7 +53,7 @@
}
return size;
}
-
+
/* Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
for(TreeSet<Block> set:priorityQueues) {
@@ -71,8 +71,10 @@
int curReplicas,
int decommissionedReplicas,
int expectedReplicas) {
- if (curReplicas<0 || curReplicas>=expectedReplicas) {
- return LEVEL; // no need to replicate
+ if (curReplicas<0) {
+ return LEVEL;
+ } else if (curReplicas>=expectedReplicas) {
+ return 3; // Block doesn't have enough racks
} else if(curReplicas==0) {
// If there are zero non-decommissioned replica but there are
// some decommissioned replicas, then assign them highest priority
@@ -99,7 +101,7 @@
int curReplicas,
int decomissionedReplicas,
int expectedReplicas) {
- if(curReplicas<0 || expectedReplicas <= curReplicas) {
+ if(curReplicas<0) {
return false;
}
int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java?rev=809439&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java Sun Aug 30 23:45:55 2009
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+import junit.framework.TestCase;
+
+public class TestBlocksWithNotEnoughRacks extends TestCase {
+
+ static {
+ ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL) ;
+ }
+
+ //Creates a block with all datanodes on same rack
+ //Adds additional datanode on a different rack
+ //The block should be replicated to the new rack
+ public void testSufficientlyReplicatedBlocksWithNotEnoughRacks() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setLong("dfs.heartbeat.interval", 1L);
+ conf.setInt("dfs.replication.interval", 1);
+ conf.set("topology.script.file.name", "xyz");
+ final short REPLICATION_FACTOR = 3;
+ final String FILE_NAME = "/testFile";
+ final Path FILE_PATH = new Path(FILE_NAME);
+ //All datanodes are on the same rack
+ String racks[] = {"/rack1","/rack1","/rack1",} ;
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION_FACTOR, true, racks);
+ try {
+ // create a file with one block with a replication factor of 3
+ final FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
+ DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+
+ Block b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
+ final FSNamesystem namesystem = cluster.getNamesystem();
+ int numRacks = namesystem.blockManager.getNumberOfRacks(b);
+
+ //Add a new datanode on a different rack
+ String newRacks[] = {"/rack2"} ;
+ cluster.startDataNodes(conf, 1, true, null, newRacks);
+
+ Thread.sleep(5000);
+
+ numRacks = namesystem.blockManager.getNumberOfRacks(b);
+ NumberReplicas number = namesystem.blockManager.countNodes(b);
+ int curReplicas = number.liveReplicas();
+
+ System.out.println("curReplicas = " + curReplicas);
+ System.out.println("numRacks = " + numRacks);
+ System.out.println("Size = " + namesystem.blockManager.neededReplications.size());
+
+ assertEquals(2,numRacks);
+ assertTrue(curReplicas == REPLICATION_FACTOR);
+ assertEquals(0,namesystem.blockManager.neededReplications.size());
+ } finally {
+ cluster.shutdown();
+ }
+
+ }
+
+ public void testUnderReplicatedNotEnoughRacks() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setLong("dfs.heartbeat.interval", 1L);
+ conf.setInt("dfs.replication.interval", 1);
+ conf.setInt("dfs.replication.pending.timeout.sec", 1);
+ conf.set("topology.script.file.name", "xyz");
+ short REPLICATION_FACTOR = 3;
+ final String FILE_NAME = "/testFile";
+ final Path FILE_PATH = new Path(FILE_NAME);
+ //All datanodes are on the same rack
+ String racks[] = {"/rack1","/rack1","/rack1",} ;
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION_FACTOR, true, racks);
+ try {
+ // create a file with one block with a replication factor of 3
+ final FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
+ DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+
+ Block b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
+ final FSNamesystem namesystem = cluster.getNamesystem();
+ int numRacks = namesystem.blockManager.getNumberOfRacks(b);
+
+ //Add a new datanode on a different rack
+ String newRacks[] = {"/rack2","/rack2"} ;
+ cluster.startDataNodes(conf, 2, true, null, newRacks);
+ REPLICATION_FACTOR = 5;
+ namesystem.setReplication(FILE_NAME, REPLICATION_FACTOR);
+ Thread.sleep(30000);
+
+
+ numRacks = namesystem.blockManager.getNumberOfRacks(b);
+ NumberReplicas number = namesystem.blockManager.countNodes(b);
+ int curReplicas = number.liveReplicas();
+
+ System.out.println("curReplicas = " + curReplicas);
+ System.out.println("numRacks = " + numRacks);
+ System.out.println("Size = " + namesystem.blockManager.neededReplications.size());
+
+ assertEquals(2,numRacks);
+ assertTrue(curReplicas == REPLICATION_FACTOR);
+ assertEquals(0,namesystem.blockManager.neededReplications.size());
+ } finally {
+ cluster.shutdown();
+ }
+
+ }
+}