You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/25 21:58:55 UTC
svn commit: r499967 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/dfs/FSNamesystem.java
Author: cutting
Date: Thu Jan 25 12:58:54 2007
New Revision: 499967
URL: http://svn.apache.org/viewvc?view=rev&rev=499967
Log:
HADOOP-659. In HDFS, prioritize replication of blocks based on their current replication level. Contributed by Hairong.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=499967&r1=499966&r2=499967
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jan 25 12:58:54 2007
@@ -102,6 +102,11 @@
13. HADOOP-879. Fix InputFormatBase to handle output generated by
MapFileOutputFormat. (cutting)
+14. HADOOP-659. In HDFS, prioritize replication of blocks based on
+ current replication level. Blocks which are severely
+ under-replicated should be further replicated before blocks which
+ are less under-replicated. (Hairong Kuang via cutting)
+
Release 0.10.0 - 2007-01-05
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=499967&r1=499966&r2=499967
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Jan 25 12:58:54 2007
@@ -152,7 +152,7 @@
// We also store pending replication-orders.
// Set of: Block
//
- private Collection<Block> neededReplications = new TreeSet<Block>();
+ private UnderReplicationBlocks neededReplications = new UnderReplicationBlocks();
private Collection<Block> pendingReplications = new TreeSet<Block>();
//
@@ -277,7 +277,189 @@
}
}
}
+
+ /* get replication factor of a block */
+ private int getReplication( Block block ) {
+ FSDirectory.INode fileINode = dir.getFileByBlock(block);
+ if( fileINode == null ) { // block does not belong to any file
+ return 0;
+ } else {
+ return fileINode.getReplication();
+ }
+ }
+ /* Class for keeping track of under replication blocks
+ * Blocks have replication priority, with priority 0 indicating the highest
+ * Blocks have only one replicas has the highest
+ */
+ private class UnderReplicationBlocks {
+ private static final int LEVEL = 3;
+ TreeSet<Block>[] priorityQueues = new TreeSet[LEVEL];
+
+ /* constructor */
+ UnderReplicationBlocks() {
+ for(int i=0; i<LEVEL; i++) {
+ priorityQueues[i] = new TreeSet<Block>();
+ }
+ }
+
+ /* Return the total number of under replication blocks */
+ synchronized int size() {
+ int size = 0;
+ for( int i=0; i<LEVEL; i++ ) {
+ size += priorityQueues[i].size();
+ }
+ return size;
+ }
+
+ /* Return the priority of a block
+ * @param block a under replication block
+ * @param curReplicas current number of replicas of the block
+ * @param expectedReplicas expected number of replicas of the block
+ */
+ private int getPriority(Block block,
+ int curReplicas, int expectedReplicas) {
+ if (curReplicas>=expectedReplicas) {
+ return LEVEL; // no need to replicate
+ } else if(curReplicas==1) {
+ return 0; // highest priority
+ } else if(curReplicas*3<expectedReplicas) {
+ return 1;
+ } else {
+ return 2;
+ }
+ }
+
+ /* add a block to a under replication queue according to its priority
+ * @param block a under replication block
+ * @param curReplicas current number of replicas of the block
+ * @param expectedReplicas expected number of replicas of the block
+ */
+ synchronized boolean add(
+ Block block, int curReplicas, int expectedReplicas) {
+ if(expectedReplicas <= curReplicas) {
+ return false;
+ }
+ int priLevel = getPriority(block, curReplicas, expectedReplicas);
+ if( priorityQueues[priLevel].add(block) ) {
+ NameNode.stateChangeLog.debug(
+ "BLOCK* NameSystem.UnderReplicationBlock.add:"
+ + block.getBlockName()
+ + " has only "+curReplicas
+ + " replicas and need " + expectedReplicas
+ + " replicas so is added to neededReplications"
+ + " at priority level " + priLevel );
+ return true;
+ }
+ return false;
+ }
+
+ /* add a block to a under replication queue */
+ synchronized boolean add(Block block) {
+ int curReplicas = countContainingNodes(blocksMap.get(block));
+ int expectedReplicas = getReplication(block);
+ return add(block, curReplicas, expectedReplicas);
+ }
+
+ /* remove a block from a under replication queue */
+ synchronized boolean remove(Block block,
+ int oldReplicas, int oldExpectedReplicas) {
+ if(oldExpectedReplicas <= oldReplicas) {
+ return false;
+ }
+ int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas);
+ return remove(block, priLevel);
+ }
+
+ /* remove a block from a under replication queue given a priority*/
+ private boolean remove(Block block, int priLevel ) {
+ if( priorityQueues[priLevel].remove(block) ) {
+ NameNode.stateChangeLog.debug(
+ "BLOCK* NameSystem.UnderReplicationBlock.remove: "
+ + "Removing block " + block.getBlockName()
+ + " from priority queue "+ priLevel );
+ return true;
+ } else {
+ for(int i=0; i<LEVEL; i++) {
+ if( i!=priLevel && priorityQueues[i].remove(block) ) {
+ NameNode.stateChangeLog.debug(
+ "BLOCK* NameSystem.UnderReplicationBlock.remove: "
+ + "Removing block " + block.getBlockName()
+ + " from priority queue "+ i );
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /* remove a block from a under replication queue */
+ synchronized boolean remove(Block block) {
+ int curReplicas = countContainingNodes(blocksMap.get(block));
+ int expectedReplicas = getReplication(block);
+ return remove(block, curReplicas, expectedReplicas);
+ }
+
+ /* update the priority level of a block */
+ synchronized void update(Block block,
+ int curReplicasDelta, int expectedReplicasDelta) {
+ int curReplicas = countContainingNodes(blocksMap.get(block));
+ int curExpectedReplicas = getReplication(block);
+ int oldReplicas = curReplicas-curReplicasDelta;
+ int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
+ int curPri = getPriority(block, curReplicas, curExpectedReplicas);
+ int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas);
+ if( oldPri != LEVEL && oldPri != curPri ) {
+ remove(block, oldPri);
+ }
+ if( curPri != LEVEL && oldPri != curPri
+ && priorityQueues[curPri].add(block)) {
+ NameNode.stateChangeLog.debug(
+ "BLOCK* NameSystem.UnderReplicationBlock.update:"
+ + block.getBlockName()
+ + " has only "+curReplicas
+ + " replicas and need " + curExpectedReplicas
+ + " replicas so is added to neededReplications"
+ + " at priority level " + curPri );
+ }
+ }
+
+ /* return a iterator of all the under replication blocks */
+ synchronized Iterator<Block> iterator() {
+ return new Iterator<Block>() {
+ int level;
+ Iterator<Block>[] iterator = new Iterator[LEVEL];
+
+ {
+ level=0;
+ for(int i=0; i<LEVEL; i++) {
+ iterator[i] = priorityQueues[i].iterator();
+ }
+ }
+
+ private void update() {
+ while( level< LEVEL-1 && !iterator[level].hasNext() ) {
+ level++;
+ }
+ }
+
+ public Block next() {
+ update();
+ return iterator[level].next();
+ }
+
+ public boolean hasNext() {
+ update();
+ return iterator[level].hasNext();
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }
+
/////////////////////////////////////////////////////////
//
// These methods are called by HadoopFS clients
@@ -347,20 +529,18 @@
if( oldRepl == replication ) // the same replication
return true;
- synchronized( neededReplications ) {
- if( oldRepl < replication ) {
- // old replication < the new one; need to replicate
- LOG.info("Increasing replication for file " + src
- + ". New replication is " + replication );
- for( int idx = 0; idx < fileBlocks.length; idx++ )
- neededReplications.add( fileBlocks[idx] );
- } else {
- // old replication > the new one; need to remove copies
- LOG.info("Reducing replication for file " + src
- + ". New replication is " + replication );
- for( int idx = 0; idx < fileBlocks.length; idx++ )
- proccessOverReplicatedBlock( fileBlocks[idx], replication );
- }
+ // update needReplication priority queues
+ LOG.info("Increasing replication for file " + src
+ + ". New replication is " + replication );
+ for( int idx = 0; idx < fileBlocks.length; idx++ )
+ neededReplications.update( fileBlocks[idx], 0, replication-oldRepl );
+
+ if( oldRepl > replication ) {
+ // old replication > the new one; need to remove copies
+ LOG.info("Reducing replication for file " + src
+ + ". New replication is " + replication );
+ for( int idx = 0; idx < fileBlocks.length; idx++ )
+ proccessOverReplicatedBlock( fileBlocks[idx], replication );
}
return true;
}
@@ -715,19 +895,15 @@
// Now that the file is real, we need to be sure to replicate
// the blocks.
+ int numExpectedReplicas = pendingFile.getReplication();
for (int i = 0; i < nrBlocks; i++) {
SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
// filter out containingNodes that are marked for decommission.
int numCurrentReplica = countContainingNodes(containingNodes);
- if (numCurrentReplica < pendingFile.getReplication()) {
- NameNode.stateChangeLog.debug(
- "DIR* NameSystem.completeFile:"
- + pendingBlocks[i].getBlockName()+" has only "+containingNodes.size()
- +" replicas so is added to neededReplications");
- synchronized (neededReplications) {
- neededReplications.add(pendingBlocks[i]);
- }
+ if (numCurrentReplica < numExpectedReplicas) {
+ neededReplications.add(
+ pendingBlocks[i], numCurrentReplica, numExpectedReplicas);
}
}
return COMPLETE_SUCCESS;
@@ -1608,8 +1784,10 @@
containingNodes = new TreeSet<DatanodeDescriptor>();
blocksMap.put(block, containingNodes);
}
+ int curReplicaDelta = 0;
if (! containingNodes.contains(node)) {
containingNodes.add(node);
+ curReplicaDelta = 1;
//
// Hairong: I would prefer to set the level of next logrecord
// to be debug.
@@ -1625,34 +1803,24 @@
+ block.getBlockName() + " on " + node.getName());
}
- synchronized (neededReplications) {
- FSDirectory.INode fileINode = dir.getFileByBlock(block);
- if( fileINode == null ) // block does not belong to any file
- return;
-
- // filter out containingNodes that are marked for decommission.
- int numCurrentReplica = countContainingNodes(containingNodes);
-
- // check whether safe replication is reached for the block
- // only if it is a part of a files
- incrementSafeBlockCount( numCurrentReplica );
- short fileReplication = fileINode.getReplication();
- if (numCurrentReplica >= fileReplication ) {
- neededReplications.remove(block);
- pendingReplications.remove(block);
- NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: "
- +block.getBlockName()+" has "+ numCurrentReplica
- +" replicas so is removed from neededReplications and pendingReplications" );
-
- } else {// numCurrentReplica < fileReplication
- neededReplications.add(block);
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.addStoredBlock: "
- +block.getBlockName()+" has only "+ numCurrentReplica
- +" replicas so is added to neededReplications" );
- }
-
- proccessOverReplicatedBlock( block, fileReplication );
- }
+ FSDirectory.INode fileINode = dir.getFileByBlock(block);
+ if( fileINode == null ) // block does not belong to any file
+ return;
+
+ // filter out containingNodes that are marked for decommission.
+ int numCurrentReplica = countContainingNodes(containingNodes);
+
+ // check whether safe replication is reached for the block
+ // only if it is a part of a files
+ incrementSafeBlockCount( numCurrentReplica );
+
+ // handle underReplication/overReplication
+ short fileReplication = fileINode.getReplication();
+ neededReplications.update(block, curReplicaDelta, 0);
+ if (numCurrentReplica >= fileReplication ) {
+ pendingReplications.remove(block);
+ }
+ proccessOverReplicatedBlock( block, fileReplication );
}
/**
@@ -1748,8 +1916,12 @@
return;
}
containingNodes.remove(node);
- decrementSafeBlockCount( containingNodes.size() );
- if( containingNodes.size() == 0 )
+
+ // filter out containingNodes that are marked for decommission.
+ int numCurrentReplica = countContainingNodes(containingNodes);
+
+ decrementSafeBlockCount( numCurrentReplica );
+ if( containingNodes.isEmpty() )
blocksMap.remove(block);
//
// It's possible that the block was removed because of a datanode
@@ -1758,13 +1930,8 @@
// be-replicated list.
//
FSDirectory.INode fileINode = dir.getFileByBlock(block);
- if( fileINode != null && (countContainingNodes(containingNodes) < fileINode.getReplication())) {
- synchronized (neededReplications) {
- neededReplications.add(block);
- }
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
- +block.getBlockName()+" has only "+containingNodes.size()
- +" replicas so is added to neededReplications" );
+ if( fileINode != null ) {
+ neededReplications.update(block, -1, 0);
}
//
@@ -1896,9 +2063,7 @@
// replicated.
Block decommissionBlocks[] = node.getBlocks();
for (int j = 0; j < decommissionBlocks.length; j++) {
- synchronized (neededReplications) {
- neededReplications.add(decommissionBlocks[j]);
- }
+ neededReplications.update(decommissionBlocks[j], -1, 0);
}
}
break;
@@ -2107,15 +2272,13 @@
* reside on the specified node. Otherwise returns false.
*/
private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
- synchronized (neededReplications) {
for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();){
- Block block = it.next();
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
- if (containingNodes.contains(srcNode)) {
- return true;
- }
+ Block block = it.next();
+ Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+ if (containingNodes.contains(srcNode)) {
+ return true;
+ }
}
- }
return false;
}
@@ -2237,9 +2400,10 @@
DatanodeDescriptor targets[] =
(DatanodeDescriptor[]) replicateTargetSets.get(i);
int numCurrentReplica = numCurrentReplicas.get(i).intValue();
- if (numCurrentReplica + targets.length >=
- dir.getFileByBlock( block).getReplication() ) {
- neededReplications.remove(block);
+ int numExpectedReplica = dir.getFileByBlock( block).getReplication();
+ neededReplications.update(
+ block, numCurrentReplica, numExpectedReplica);
+ if (numCurrentReplica + targets.length >= numExpectedReplica) {
pendingReplications.add(block);
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.pendingTransfer: "