You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2005/08/02 21:17:07 UTC

svn commit: r227068 - /lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java

Author: mc
Date: Tue Aug  2 12:17:05 2005
New Revision: 227068

URL: http://svn.apache.org/viewcvs?rev=227068&view=rev
Log:

  One feature, one bug fix.

  Bug: We were mistakenly generating negative index numbers for
choosing a datanode.  No more!

  Feature: We can now really choose a target datanode according to
available capacity.  This used to be broken (and was commented-out).


Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=227068&r1=227067&r2=227068&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java Tue Aug  2 12:17:05 2005
@@ -47,6 +47,9 @@
     // HEARTBEAT_RECHECK is how often a datanode sends its hearbeat
     final static long HEARTBEAT_RECHECK = 1000;
 
+    // Whether we should use disk-availability info when determining target
+    final static boolean USE_AVAILABILITY = NutchConf.get().getBoolean("ndfs.availability.allocation", false);
+
     //
     // Stores the correct file name hierarchy
     //
@@ -1176,6 +1179,7 @@
         //
         // Now build list of machines we can actually choose from
         //
+        long totalRemaining = 0;
         Vector targetList = new Vector();
         for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
             DatanodeInfo node = (DatanodeInfo) it.next();
@@ -1183,6 +1187,7 @@
                 (forbidden2 == null || ! forbidden2.contains(node)) &&
                 (! forbiddenMachines.contains(node.getHost()))) {
                 targetList.add(node);
+                totalRemaining += node.getRemaining();
             }
         }
 
@@ -1191,52 +1196,23 @@
         //
         if (targetList.size() == 0) {
             return null;
+        } else if (! USE_AVAILABILITY) {
+            int target = r.nextInt(targetList.size());
+            return (DatanodeInfo) targetList.elementAt(target);
         } else {
-            return (DatanodeInfo) targetList.elementAt(r.nextInt() % targetList.size());
-        }
-        /**
-         * Choose target weighted by available storage
-         */
-        /**
-        synchronized (datanodeMap) {
-            if (datanodeMap.size() == 0) {
-                return;
-            }
+            // Choose node according to target capacity
+            double target = r.nextDouble() * totalRemaining;
 
-            long totalRemaining = 0;
-            Vector okTargets = new Vector();
-            for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
+            for (Iterator it = targetList.iterator(); it.hasNext(); ) {
                 DatanodeInfo node = (DatanodeInfo) it.next();
-                if ((alreadyHasNode == null || ! alreadyHasNode.contains(node)) &&
-                    (alreadyChosen == null || ! alreadyChosen.contains(node))) {
-                    okTargets.add(node);
-                    totalRemaining += node.getRemaining();
+                target -= node.getRemaining();
+                if (target <= 0) {
+                    return node;
                 }
             }
 
-            //
-            // Now pick one
-            //
-            DatanodeInfo target = null;
-            if (okTargets.size() > 0) {
-                //
-                // Repeatedly choose random byte of the total bytes free.
-                // The machine that has that byte will be our target.  Thus,
-                // we select at random with bias toward machines with greater
-                // available storage.
-                //
-                long targetByte = r.nextLong(totalRemaining);
-                for (Iterator it = okTargets.iterator(); it.hasNext(); ) {
-                    DatanodeInfo node = (DatanodeInfO) it.next();
-                    targetByte -= node.getRemaining();
-                    if (targetByte <= 0) {
-                        target = node;
-                        break;
-                    }
-                }
-            }
-            return target;
+            LOG.info("Impossible state.  When trying to choose target node, could not find any.  This may indicate that datanode capacities are being updated during datanode selection.  Anyway, now returning an arbitrary target to recover...");
+            return (DatanodeInfo) targetList.elementAt(r.nextInt(targetList.size()));
         }
-        **/
     }
 }