You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2009/02/07 00:13:25 UTC
svn commit: r741776 - in /hadoop/core/trunk: CHANGES.txt
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
Author: szetszwo
Date: Fri Feb 6 23:13:25 2009
New Revision: 741776
URL: http://svn.apache.org/viewvc?rev=741776&view=rev
Log:
DOOP-5124. Choose datanodes randomly instead of starting from the first for providing fairness. (hairong via szetszwo)
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=741776&r1=741775&r2=741776&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Feb 6 23:13:25 2009
@@ -71,6 +71,9 @@
HADOOP-3327. Improves handling of READ_TIMEOUT during map output copying.
(Amareshwari Sriramadasu via ddas)
+ HADOOP-5124. Choose datanodes randomly instead of starting from the first
+ datanode for providing fairness. (hairong via szetszwo)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=741776&r1=741775&r2=741776&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Feb 6 23:13:25 2009
@@ -264,7 +264,7 @@
private final GenerationStamp generationStamp = new GenerationStamp();
// Ask Datanode only up to this many blocks to delete.
- private int blockInvalidateLimit = FSConstants.BLOCK_INVALIDATE_CHUNK;
+ int blockInvalidateLimit = FSConstants.BLOCK_INVALIDATE_CHUNK;
// precision of access times.
private long accessTimePrecision = 0;
@@ -1146,8 +1146,12 @@
// remove this block from the list of pending blocks to be deleted.
// This reduces the possibility of triggering HADOOP-1349.
//
- for(Collection<Block> v : recentInvalidateSets.values()) {
- v.remove(last);
+ for (DatanodeDescriptor dd : targets) {
+ String datanodeId = dd.getStorageID();
+ Collection<Block> v = recentInvalidateSets.get(datanodeId);
+ if (v != null && v.remove(last) && v.isEmpty()) {
+ recentInvalidateSets.remove(datanodeId);
+ }
}
}
}
@@ -1451,7 +1455,7 @@
* @param b block
* @param n datanode
*/
- private void addToInvalidatesNoLog(Block b, DatanodeInfo n) {
+ void addToInvalidatesNoLog(Block b, DatanodeInfo n) {
Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
if (invalidateSet == null) {
invalidateSet = new HashSet<Block>();
@@ -2296,13 +2300,37 @@
return workFound;
}
- private int computeInvalidateWork(int nodesToProcess) {
+ /**
+ * Schedule blocks for deletion at datanodes
+ * @param nodesToProcess number of datanodes to schedule deletion work
+ * @return total number of block for deletion
+ */
+ int computeInvalidateWork(int nodesToProcess) {
+ int numOfNodes = recentInvalidateSets.size();
+ nodesToProcess = Math.min(numOfNodes, nodesToProcess);
+
+ // get an array of the keys
+ ArrayList<String> keyArray =
+ new ArrayList<String>(recentInvalidateSets.keySet());
+
+ // randomly pick up <i>nodesToProcess</i> nodes
+ // and put them at [0, nodesToProcess)
+ int remainingNodes = numOfNodes - nodesToProcess;
+ if (nodesToProcess < remainingNodes) {
+ for(int i=0; i<nodesToProcess; i++) {
+ int keyIndex = r.nextInt(numOfNodes-i)+i;
+ Collections.swap(keyArray, keyIndex, i); // swap to front
+ }
+ } else {
+ for(int i=0; i<remainingNodes; i++) {
+ int keyIndex = r.nextInt(numOfNodes-i);
+ Collections.swap(keyArray, keyIndex, numOfNodes-i-1); // swap to end
+ }
+ }
+
int blockCnt = 0;
for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
- int work = invalidateWorkForOneNode();
- if(work == 0)
- break;
- blockCnt += work;
+ blockCnt += invalidateWorkForOneNode(keyArray.get(nodeCnt));
}
return blockCnt;
}
@@ -2498,24 +2526,25 @@
}
/**
- * Get blocks to invalidate for the first node
+ * Get blocks to invalidate for <i>nodeId</i>
* in {@link #recentInvalidateSets}.
*
* @return number of blocks scheduled for removal during this iteration.
*/
- private synchronized int invalidateWorkForOneNode() {
+ private synchronized int invalidateWorkForOneNode(String nodeId) {
// blocks should not be replicated or removed if safe mode is on
if (isInSafeMode())
return 0;
- if(recentInvalidateSets.isEmpty())
+ // get blocks to invalidate for the nodeId
+ assert nodeId != null;
+ DatanodeDescriptor dn = datanodeMap.get(nodeId);
+ if (dn == null) {
+ recentInvalidateSets.remove(nodeId);
return 0;
- // get blocks to invalidate for the first node
- String firstNodeId = recentInvalidateSets.keySet().iterator().next();
- assert firstNodeId != null;
- DatanodeDescriptor dn = datanodeMap.get(firstNodeId);
- Collection<Block> invalidateSet = recentInvalidateSets.remove(firstNodeId);
-
- if(invalidateSet == null || dn == null)
+ }
+
+ Collection<Block> invalidateSet = recentInvalidateSets.get(nodeId);
+ if (invalidateSet == null)
return 0;
ArrayList<Block> blocksToInvalidate =
@@ -2529,10 +2558,9 @@
it.remove();
}
- // If we could not send everything in this message, reinsert this item
- // into the collection.
- if(it.hasNext())
- recentInvalidateSets.put(firstNodeId, invalidateSet);
+ // If we send everything in this message, remove this node entry
+ if(!it.hasNext())
+ recentInvalidateSets.remove(nodeId);
dn.addBlocksToBeInvalidated(blocksToInvalidate);
Added: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java?rev=741776&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java Fri Feb 6 23:13:25 2009
@@ -0,0 +1,57 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+
+import junit.framework.TestCase;
+
+/**
+ * Test if FSNamesystem handles heartbeat right
+ */
+public class TestComputeInvalidateWork extends TestCase {
+ /**
+ * Test if {@link FSNamesystem#computeInvalidateWork(int)}
+ * can schedule invalidate work correctly
+ */
+ public void testCompInvalidate() throws Exception {
+ final Configuration conf = new Configuration();
+ final int NUM_OF_DATANODES = 3;
+ final MiniDFSCluster cluster = new MiniDFSCluster(conf, NUM_OF_DATANODES, true, null);
+ try {
+ cluster.waitActive();
+ final FSNamesystem namesystem = cluster.getNamesystem();
+ DatanodeDescriptor[] nodes =
+ namesystem.heartbeats.toArray(new DatanodeDescriptor[NUM_OF_DATANODES]);
+ assertEquals(nodes.length, NUM_OF_DATANODES);
+
+ synchronized (namesystem) {
+ for (int i=0; i<nodes.length; i++) {
+ for(int j=0; j<3*namesystem.blockInvalidateLimit+1; j++) {
+ Block block = new Block(i*(namesystem.blockInvalidateLimit+1)+j, 0,
+ GenerationStamp.FIRST_VALID_STAMP);
+ namesystem.addToInvalidatesNoLog(block, nodes[i]);
+ }
+ }
+
+ assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES,
+ namesystem.computeInvalidateWork(NUM_OF_DATANODES+1));
+ assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES,
+ namesystem.computeInvalidateWork(NUM_OF_DATANODES));
+ assertEquals(namesystem.blockInvalidateLimit*(NUM_OF_DATANODES-1),
+ namesystem.computeInvalidateWork(NUM_OF_DATANODES-1));
+ int workCount = namesystem.computeInvalidateWork(1);
+ if (workCount == 1) {
+ assertEquals(namesystem.blockInvalidateLimit+1,
+ namesystem.computeInvalidateWork(2));
+ } else {
+ assertEquals(workCount, namesystem.blockInvalidateLimit);
+ assertEquals(2, namesystem.computeInvalidateWork(2));
+ }
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+}