You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2011/11/08 01:16:24 UTC
svn commit: r1199024 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
Author: hairong
Date: Tue Nov 8 00:16:23 2011
New Revision: 1199024
URL: http://svn.apache.org/viewvc?rev=1199024&view=rev
Log:
HDFS-2495. Increase granularity of write operations in ReplicationMonitor thus reducing contention for write lock. Contributed by Tomasz Nykiel.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1199024&r1=1199023&r2=1199024&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Nov 8 00:16:23 2011
@@ -52,9 +52,14 @@ Trunk (unreleased changes)
HDFS-2334. Add Closeable to JournalManager. (Ivan Kelly via jitendra)
+
+ OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the
namenode state. (Tomasz Nykiel via hairong)
+ HDFS-2495. Increase granularity of write operations in ReplicationMonitor
+ thus reducing contention for write lock. (Tomasz Nykiel via hairong)
+
BUG FIXES
HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1199024&r1=1199023&r2=1199024&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Nov 8 00:16:23 2011
@@ -929,15 +929,7 @@ public class BlockManager {
chooseUnderReplicatedBlocks(blocksToProcess);
// replicate blocks
- int scheduledReplicationCount = 0;
- for (int i=0; i<blocksToReplicate.size(); i++) {
- for(Block block : blocksToReplicate.get(i)) {
- if (computeReplicationWorkForBlock(block, i)) {
- scheduledReplicationCount++;
- }
- }
- }
- return scheduledReplicationCount;
+ return computeReplicationWorkForBlocks(blocksToReplicate);
}
/**
@@ -1002,170 +994,201 @@ public class BlockManager {
return blocksToReplicate;
}
- /** Replicate a block
+ /** Replicate a set of blocks
*
- * @param block block to be replicated
- * @param priority a hint of its priority in the neededReplication queue
- * @return if the block gets replicated or not
+ * @param blocksToReplicate blocks to be replicated, for each priority
+ * @return the number of blocks scheduled for replication
*/
@VisibleForTesting
- boolean computeReplicationWorkForBlock(Block block, int priority) {
+ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes, liveReplicaNodes;
DatanodeDescriptor srcNode;
INodeFile fileINode = null;
int additionalReplRequired;
+ int scheduledWork = 0;
+ List<ReplicationWork> work = new LinkedList<ReplicationWork>();
+
namesystem.writeLock();
try {
synchronized (neededReplications) {
- // block should belong to a file
- fileINode = blocksMap.getINode(block);
- // abandoned block or block reopened for append
- if(fileINode == null || fileINode.isUnderConstruction()) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- return false;
- }
+ for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
+ for (Block block : blocksToReplicate.get(priority)) {
+ // block should belong to a file
+ fileINode = blocksMap.getINode(block);
+ // abandoned block or block reopened for append
+ if(fileINode == null || fileINode.isUnderConstruction()) {
+ neededReplications.remove(block, priority); // remove from neededReplications
+ replIndex--;
+ continue;
+ }
- requiredReplication = fileINode.getReplication();
+ requiredReplication = fileINode.getReplication();
- // get a source data-node
- containingNodes = new ArrayList<DatanodeDescriptor>();
- liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
- NumberReplicas numReplicas = new NumberReplicas();
- srcNode = chooseSourceDatanode(
- block, containingNodes, liveReplicaNodes, numReplicas);
- if(srcNode == null) // block can not be replicated from any node
- return false;
-
- assert liveReplicaNodes.size() == numReplicas.liveReplicas();
- // do not schedule more if enough replicas is already pending
- numEffectiveReplicas = numReplicas.liveReplicas() +
- pendingReplications.getNumReplicas(block);
+ // get a source data-node
+ containingNodes = new ArrayList<DatanodeDescriptor>();
+ liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
+ NumberReplicas numReplicas = new NumberReplicas();
+ srcNode = chooseSourceDatanode(
+ block, containingNodes, liveReplicaNodes, numReplicas);
+ if(srcNode == null) // block can not be replicated from any node
+ continue;
+
+ assert liveReplicaNodes.size() == numReplicas.liveReplicas();
+ // do not schedule more if enough replicas is already pending
+ numEffectiveReplicas = numReplicas.liveReplicas() +
+ pendingReplications.getNumReplicas(block);
- if (numEffectiveReplicas >= requiredReplication) {
- if ( (pendingReplications.getNumReplicas(block) > 0) ||
- (blockHasEnoughRacks(block)) ) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- NameNode.stateChangeLog.info("BLOCK* "
- + "Removing block " + block
- + " from neededReplications as it has enough replicas.");
- return false;
- }
- }
+ if (numEffectiveReplicas >= requiredReplication) {
+ if ( (pendingReplications.getNumReplicas(block) > 0) ||
+ (blockHasEnoughRacks(block)) ) {
+ neededReplications.remove(block, priority); // remove from neededReplications
+ replIndex--;
+ NameNode.stateChangeLog.info("BLOCK* "
+ + "Removing block " + block
+ + " from neededReplications as it has enough replicas.");
+ continue;
+ }
+ }
- if (numReplicas.liveReplicas() < requiredReplication) {
- additionalReplRequired = requiredReplication - numEffectiveReplicas;
- } else {
- additionalReplRequired = 1; //Needed on a new rack
+ if (numReplicas.liveReplicas() < requiredReplication) {
+ additionalReplRequired = requiredReplication
+ - numEffectiveReplicas;
+ } else {
+ additionalReplRequired = 1; // Needed on a new rack
+ }
+ work.add(new ReplicationWork(block, fileINode, srcNode,
+ containingNodes, liveReplicaNodes, additionalReplRequired,
+ priority));
+ }
}
-
}
} finally {
namesystem.writeUnlock();
}
-
- // Exclude all of the containing nodes from being targets.
- // This list includes decommissioning or corrupt nodes.
- HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
- for (DatanodeDescriptor dn : containingNodes) {
- excludedNodes.put(dn, dn);
- }
- // choose replication targets: NOT HOLDING THE GLOBAL LOCK
- // It is costly to extract the filename for which chooseTargets is called,
- // so for now we pass in the Inode itself.
- DatanodeDescriptor targets[] =
- blockplacement.chooseTarget(fileINode, additionalReplRequired,
- srcNode, liveReplicaNodes, excludedNodes, block.getNumBytes());
- if(targets.length == 0)
- return false;
+ HashMap<Node, Node> excludedNodes
+ = new HashMap<Node, Node>();
+ for(ReplicationWork rw : work){
+ // Exclude all of the containing nodes from being targets.
+ // This list includes decommissioning or corrupt nodes.
+ excludedNodes.clear();
+ for (DatanodeDescriptor dn : rw.containingNodes) {
+ excludedNodes.put(dn, dn);
+ }
+
+ // choose replication targets: NOT HOLDING THE GLOBAL LOCK
+ // It is costly to extract the filename for which chooseTargets is called,
+ // so for now we pass in the Inode itself.
+ rw.targets = blockplacement.chooseTarget(rw.fileINode,
+ rw.additionalReplRequired, rw.srcNode, rw.liveReplicaNodes,
+ excludedNodes, rw.block.getNumBytes());
+ }
namesystem.writeLock();
try {
- synchronized (neededReplications) {
- // Recheck since global lock was released
- // block should belong to a file
- fileINode = blocksMap.getINode(block);
- // abandoned block or block reopened for append
- if(fileINode == null || fileINode.isUnderConstruction()) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- return false;
+ for(ReplicationWork rw : work){
+ DatanodeDescriptor[] targets = rw.targets;
+ if(targets == null || targets.length == 0){
+ rw.targets = null;
+ continue;
}
- requiredReplication = fileINode.getReplication();
- // do not schedule more if enough replicas is already pending
- NumberReplicas numReplicas = countNodes(block);
- numEffectiveReplicas = numReplicas.liveReplicas() +
- pendingReplications.getNumReplicas(block);
-
- if (numEffectiveReplicas >= requiredReplication) {
- if ( (pendingReplications.getNumReplicas(block) > 0) ||
- (blockHasEnoughRacks(block)) ) {
+ synchronized (neededReplications) {
+ Block block = rw.block;
+ int priority = rw.priority;
+ // Recheck since global lock was released
+ // block should belong to a file
+ fileINode = blocksMap.getINode(block);
+ // abandoned block or block reopened for append
+ if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
+ rw.targets = null;
replIndex--;
- NameNode.stateChangeLog.info("BLOCK* "
- + "Removing block " + block
- + " from neededReplications as it has enough replicas.");
- return false;
+ continue;
}
- }
+ requiredReplication = fileINode.getReplication();
- if ( (numReplicas.liveReplicas() >= requiredReplication) &&
- (!blockHasEnoughRacks(block)) ) {
- if (srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
- //No use continuing, unless a new rack in this case
- return false;
+ // do not schedule more if enough replicas is already pending
+ NumberReplicas numReplicas = countNodes(block);
+ numEffectiveReplicas = numReplicas.liveReplicas() +
+ pendingReplications.getNumReplicas(block);
+
+ if (numEffectiveReplicas >= requiredReplication) {
+ if ( (pendingReplications.getNumReplicas(block) > 0) ||
+ (blockHasEnoughRacks(block)) ) {
+ neededReplications.remove(block, priority); // remove from neededReplications
+ replIndex--;
+ rw.targets = null;
+ NameNode.stateChangeLog.info("BLOCK* "
+ + "Removing block " + block
+ + " from neededReplications as it has enough replicas.");
+ continue;
+ }
}
- }
- // Add block to the to be replicated list
- srcNode.addBlockToBeReplicated(block, targets);
+ if ( (numReplicas.liveReplicas() >= requiredReplication) &&
+ (!blockHasEnoughRacks(block)) ) {
+ if (rw.srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
+ //No use continuing, unless a new rack in this case
+ continue;
+ }
+ }
- for (DatanodeDescriptor dn : targets) {
- dn.incBlocksScheduled();
- }
+ // Add block to the to be replicated list
+ rw.srcNode.addBlockToBeReplicated(block, targets);
+ scheduledWork++;
- // Move the block-replication into a "pending" state.
- // The reason we use 'pending' is so we can retry
- // replications that fail after an appropriate amount of time.
- pendingReplications.add(block, targets.length);
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug(
- "BLOCK* block " + block
- + " is moved from neededReplications to pendingReplications");
- }
+ for (DatanodeDescriptor dn : targets) {
+ dn.incBlocksScheduled();
+ }
+
+ // Move the block-replication into a "pending" state.
+ // The reason we use 'pending' is so we can retry
+ // replications that fail after an appropriate amount of time.
+ pendingReplications.add(block, targets.length);
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug(
+ "BLOCK* block " + block
+ + " is moved from neededReplications to pendingReplications");
+ }
- // remove from neededReplications
- if(numEffectiveReplicas + targets.length >= requiredReplication) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
+ // remove from neededReplications
+ if(numEffectiveReplicas + targets.length >= requiredReplication) {
+ neededReplications.remove(block, priority); // remove from neededReplications
+ replIndex--;
+ }
}
- if (NameNode.stateChangeLog.isInfoEnabled()) {
+ }
+ } finally {
+ namesystem.writeUnlock();
+ }
+
+ if (NameNode.stateChangeLog.isInfoEnabled()) {
+ // log which blocks have been scheduled for replication
+ for(ReplicationWork rw : work){
+ DatanodeDescriptor[] targets = rw.targets;
+ if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
for (int k = 0; k < targets.length; k++) {
targetList.append(' ');
targetList.append(targets[k].getName());
}
NameNode.stateChangeLog.info(
- "BLOCK* ask "
- + srcNode.getName() + " to replicate "
- + block + " to " + targetList);
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug(
- "BLOCK* neededReplications = " + neededReplications.size()
- + " pendingReplications = " + pendingReplications.size());
- }
+ "BLOCK* ask "
+ + rw.srcNode.getName() + " to replicate "
+ + rw.block + " to " + targetList);
}
}
- } finally {
- namesystem.writeUnlock();
+ }
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug(
+ "BLOCK* neededReplications = " + neededReplications.size()
+ + " pendingReplications = " + pendingReplications.size());
}
- return true;
+ return scheduledWork;
}
/**
@@ -2596,4 +2619,34 @@ public class BlockManager {
return workFound;
}
+ private static class ReplicationWork {
+
+ private Block block;
+ private INodeFile fileINode;
+
+ private DatanodeDescriptor srcNode;
+ private List<DatanodeDescriptor> containingNodes;
+ private List<DatanodeDescriptor> liveReplicaNodes;
+ private int additionalReplRequired;
+
+ private DatanodeDescriptor targets[];
+ private int priority;
+
+ public ReplicationWork(Block block,
+ INodeFile fileINode,
+ DatanodeDescriptor srcNode,
+ List<DatanodeDescriptor> containingNodes,
+ List<DatanodeDescriptor> liveReplicaNodes,
+ int additionalReplRequired,
+ int priority) {
+ this.block = block;
+ this.fileINode = fileINode;
+ this.srcNode = srcNode;
+ this.containingNodes = containingNodes;
+ this.liveReplicaNodes = liveReplicaNodes;
+ this.additionalReplRequired = additionalReplRequired;
+ this.priority = priority;
+ this.targets = null;
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1199024&r1=1199023&r2=1199024&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Tue Nov 8 00:16:23 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.bl
import static org.junit.Assert.*;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
@@ -355,25 +356,35 @@ public class TestBlockManager {
bm.blocksMap.addINode(blockInfo, iNode);
return blockInfo;
}
-
+
private DatanodeDescriptor[] scheduleSingleReplication(Block block) {
- assertEquals("Block not initially pending replication",
- 0, bm.pendingReplications.getNumReplicas(block));
- assertTrue("computeReplicationWork should indicate replication is needed",
- bm.computeReplicationWorkForBlock(block, 1));
+ // list for priority 1
+ List<Block> list_p1 = new ArrayList<Block>();
+ list_p1.add(block);
+
+ // list of lists for each priority
+ List<List<Block>> list_all = new ArrayList<List<Block>>();
+ list_all.add(new ArrayList<Block>()); // for priority 0
+ list_all.add(list_p1); // for priority 1
+
+ assertEquals("Block not initially pending replication", 0,
+ bm.pendingReplications.getNumReplicas(block));
+ assertEquals(
+ "computeReplicationWork should indicate replication is needed", 1,
+ bm.computeReplicationWorkForBlocks(list_all));
assertTrue("replication is pending after work is computed",
bm.pendingReplications.getNumReplicas(block) > 0);
-
- LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
- getAllPendingReplications();
+
+ LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls = getAllPendingReplications();
assertEquals(1, repls.size());
- Entry<DatanodeDescriptor, BlockTargetPair> repl = repls.entries().iterator().next();
+ Entry<DatanodeDescriptor, BlockTargetPair> repl = repls.entries()
+ .iterator().next();
DatanodeDescriptor[] targets = repl.getValue().targets;
-
+
DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length];
pipeline[0] = repl.getKey();
System.arraycopy(targets, 0, pipeline, 1, targets.length);
-
+
return pipeline;
}