You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2005/08/02 21:17:07 UTC
svn commit: r227068 -
/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
Author: mc
Date: Tue Aug 2 12:17:05 2005
New Revision: 227068
URL: http://svn.apache.org/viewcvs?rev=227068&view=rev
Log:
One feature, one bug fix.
Bug: We were mistakenly generating negative index numbers for
choosing a datanode. No more!
Feature: We can now really choose a target datanode according to
available capacity. This used to be broken (and was commented-out).
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=227068&r1=227067&r2=227068&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java Tue Aug 2 12:17:05 2005
@@ -47,6 +47,9 @@
// HEARTBEAT_RECHECK is how often a datanode sends its hearbeat
final static long HEARTBEAT_RECHECK = 1000;
+ // Whether we should use disk-availability info when determining target
+ final static boolean USE_AVAILABILITY = NutchConf.get().getBoolean("ndfs.availability.allocation", false);
+
//
// Stores the correct file name hierarchy
//
@@ -1176,6 +1179,7 @@
//
// Now build list of machines we can actually choose from
//
+ long totalRemaining = 0;
Vector targetList = new Vector();
for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
DatanodeInfo node = (DatanodeInfo) it.next();
@@ -1183,6 +1187,7 @@
(forbidden2 == null || ! forbidden2.contains(node)) &&
(! forbiddenMachines.contains(node.getHost()))) {
targetList.add(node);
+ totalRemaining += node.getRemaining();
}
}
@@ -1191,52 +1196,23 @@
//
if (targetList.size() == 0) {
return null;
+ } else if (! USE_AVAILABILITY) {
+ int target = r.nextInt(targetList.size());
+ return (DatanodeInfo) targetList.elementAt(target);
} else {
- return (DatanodeInfo) targetList.elementAt(r.nextInt() % targetList.size());
- }
- /**
- * Choose target weighted by available storage
- */
- /**
- synchronized (datanodeMap) {
- if (datanodeMap.size() == 0) {
- return;
- }
+ // Choose node according to target capacity
+ double target = r.nextDouble() * totalRemaining;
- long totalRemaining = 0;
- Vector okTargets = new Vector();
- for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
+ for (Iterator it = targetList.iterator(); it.hasNext(); ) {
DatanodeInfo node = (DatanodeInfo) it.next();
- if ((alreadyHasNode == null || ! alreadyHasNode.contains(node)) &&
- (alreadyChosen == null || ! alreadyChosen.contains(node))) {
- okTargets.add(node);
- totalRemaining += node.getRemaining();
+ target -= node.getRemaining();
+ if (target <= 0) {
+ return node;
}
}
- //
- // Now pick one
- //
- DatanodeInfo target = null;
- if (okTargets.size() > 0) {
- //
- // Repeatedly choose random byte of the total bytes free.
- // The machine that has that byte will be our target. Thus,
- // we select at random with bias toward machines with greater
- // available storage.
- //
- long targetByte = r.nextLong(totalRemaining);
- for (Iterator it = okTargets.iterator(); it.hasNext(); ) {
- DatanodeInfo node = (DatanodeInfO) it.next();
- targetByte -= node.getRemaining();
- if (targetByte <= 0) {
- target = node;
- break;
- }
- }
- }
- return target;
+ LOG.info("Impossible state. When trying to choose target node, could not find any. This may indicate that datanode capacities are being updated during datanode selection. Anyway, now returning an arbitrary target to recover...");
+ return (DatanodeInfo) targetList.elementAt(r.nextInt(targetList.size()));
}
- **/
}
}