You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mc...@apache.org on 2006/03/05 02:02:34 UTC

svn commit: r383247 - in /lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs: ClientProtocol.java DFSClient.java FSConstants.java FSNamesystem.java NameNode.java

Author: mc
Date: Sat Mar  4 17:02:33 2006
New Revision: 383247

URL: http://svn.apache.org/viewcvs?rev=383247&view=rev
Log:

  Fix bug HADOOP-26.  Available space is now considered
correctly during DFS block-allocation.



Modified:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?rev=383247&r1=383246&r2=383247&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Sat Mar  4 17:02:33 2006
@@ -51,7 +51,7 @@
      * create multi-block files must also use reportWrittenBlock()
      * and addBlock().
      */
-    public LocatedBlock create(String src, String clientName, boolean overwrite) throws IOException;
+    public LocatedBlock create(String src, String clientName, String clientMachine, boolean overwrite) throws IOException;
 
     /**
      * A client that has written a block of data can report completion
@@ -81,7 +81,7 @@
      * A null response means the NameNode could not allocate a block,
      * and that the caller should try again.
      */
-    public LocatedBlock addBlock(String src) throws IOException;
+    public LocatedBlock addBlock(String src, String clientMachine) throws IOException;
 
     /**
      * A client that wants to abandon writing to the current file

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=383247&r1=383246&r2=383247&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Sat Mar  4 17:02:33 2006
@@ -574,9 +574,9 @@
                 LocatedBlock lb = null;                
                 while (! blockComplete) {
                     if (firstTime) {
-                        lb = namenode.create(src.toString(), clientName.toString(), overwrite);
+                        lb = namenode.create(src.toString(), clientName.toString(), localName, overwrite);
                     } else {
-                        lb = namenode.addBlock(src.toString());
+                        lb = namenode.addBlock(src.toString(), localName);
                     }
 
                     if (lb == null) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=383247&r1=383246&r2=383247&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Sat Mar  4 17:02:33 2006
@@ -24,7 +24,7 @@
  ************************************/
 interface FSConstants {
     public static int BLOCK_SIZE = 32 * 1000 * 1000;
-    //public static int BLOCK_SIZE = 19;
+    public static int MIN_BLOCKS_FOR_WRITE = 5;
 
     public static final long WRITE_COMPLETE = 0xcafae11a;
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=383247&r1=383246&r2=383247&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Sat Mar  4 17:02:33 2006
@@ -38,8 +38,6 @@
 class FSNamesystem implements FSConstants {
     public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.fs.FSNamesystem");
 
-   
-
     //
     // Stores the correct file name hierarchy
     //
@@ -144,11 +142,7 @@
     private int minReplication;
     // HEARTBEAT_RECHECK is how often a datanode sends its hearbeat
     private int heartBeatRecheck;
-   //  Whether we should use disk-availability info when determining target
-    private boolean useAvailability;
 
-    private boolean allowSameHostTargets;
-    
     /**
      * dir is where the filesystem directory state 
      * is stored
@@ -167,9 +161,6 @@
         this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
         this.minReplication = 1;
         this.heartBeatRecheck= 1000;
-        this.useAvailability = conf.getBoolean("dfs.availability.allocation", false);
-        this.allowSameHostTargets =
-           conf.getBoolean("test.dfs.same.host.targets.allowed", false);
     }
 
     /** Close down this filesystem manager.
@@ -241,7 +232,7 @@
      * of machines, or null if src is invalid for creation (based on
      * {@link FSDirectory#isValidToCreate(UTF8)}.
      */
-    public synchronized Object[] startFile(UTF8 src, UTF8 holder, boolean overwrite) {
+    public synchronized Object[] startFile(UTF8 src, UTF8 holder, UTF8 clientMachine, boolean overwrite) {
         Object results[] = null;
         if (pendingCreates.get(src) == null) {
             boolean fileValid = dir.isValidToCreate(src);
@@ -254,7 +245,7 @@
                 results = new Object[2];
 
                 // Get the array of replication targets 
-                DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null);
+                DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null, clientMachine);
                 if (targets.length < this.minReplication) {
                     LOG.warning("Target-length is " + targets.length +
                         ", below MIN_REPLICATION (" + this.minReplication+ ")");
@@ -300,7 +291,7 @@
      * are replicated.  Will return an empty 2-elt array if we want the
      * client to "try again later".
      */
-    public synchronized Object[] getAdditionalBlock(UTF8 src) {
+    public synchronized Object[] getAdditionalBlock(UTF8 src, UTF8 clientMachine) {
         Object results[] = null;
         if (dir.getFile(src) == null && pendingCreates.get(src) != null) {
             results = new Object[2];
@@ -310,7 +301,7 @@
             //
             if (checkFileProgress(src)) {
                 // Get the array of replication targets 
-                DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null);
+                DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null, clientMachine);
                 if (targets.length < this.minReplication) {
                     return null;
                 }
@@ -1171,7 +1162,7 @@
                     } else {
                         TreeSet containingNodes = (TreeSet) blocksMap.get(block);
                         if (containingNodes.contains(srcNode)) {
-                            DatanodeInfo targets[] = chooseTargets(Math.min(this.desiredReplication - containingNodes.size(), this.maxReplicationStreams - xmitsInProgress), containingNodes);
+                            DatanodeInfo targets[] = chooseTargets(Math.min(this.desiredReplication - containingNodes.size(), this.maxReplicationStreams - xmitsInProgress), containingNodes, null);
                             if (targets.length > 0) {
                                 // Build items to return
                                 replicateBlocks.add(block);
@@ -1228,12 +1219,12 @@
      * considered targets.
      * @return array of DatanodeInfo instances uses as targets.
      */
-    DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes) {
+    DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes, UTF8 clientMachine) {
         TreeSet alreadyChosen = new TreeSet();
         Vector targets = new Vector();
 
         for (int i = 0; i < desiredReplicates; i++) {
-            DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen);
+            DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen, clientMachine);
             if (target != null) {
                 targets.add(target);
                 alreadyChosen.add(target);
@@ -1256,7 +1247,7 @@
      * @return DatanodeInfo instance to use or null if something went wrong
      * (a log message is emitted if null is returned).
      */
-    DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2) {
+    DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2, UTF8 clientMachine) {
         //
         // Check if there are any available targets at all
         //
@@ -1266,84 +1257,87 @@
             return null;
         }
 
-        TreeSet forbiddenMachines = new TreeSet();
         //
-        // In addition to already-chosen datanode/port pairs, we want to avoid
-        // already-chosen machinenames.  (There can be multiple datanodes per
-        // machine.)  We might relax this requirement in the future, though. (Maybe
-        // so that at least one replicate is off the machine.)
+        // Build a map of forbidden hostnames from the two forbidden sets.
         //
-        UTF8 hostOrHostAndPort = null;
+        TreeSet forbiddenMachines = new TreeSet();
         if (forbidden1 != null) {
-          // add name [and host] of all elements in forbidden1 to forbiddenMachines
             for (Iterator it = forbidden1.iterator(); it.hasNext(); ) {
                 DatanodeInfo cur = (DatanodeInfo) it.next();
-                if (allowSameHostTargets) {
-                  hostOrHostAndPort = cur.getName(); // forbid same host:port
-                } else {
-                  hostOrHostAndPort = cur.getHost(); // forbid same host
-                }
-                forbiddenMachines.add(hostOrHostAndPort);
+                forbiddenMachines.add(cur.getHost());
             }
         }
         if (forbidden2 != null) {
-          // add name [and host] of all elements in forbidden2 to forbiddenMachines
             for (Iterator it = forbidden2.iterator(); it.hasNext(); ) {
                 DatanodeInfo cur = (DatanodeInfo) it.next();
-              if (allowSameHostTargets) {
-                hostOrHostAndPort = cur.getName(); // forbid same host:port
-              } else {
-                hostOrHostAndPort = cur.getHost(); // forbid same host
-              }
-              forbiddenMachines.add(hostOrHostAndPort);
+                forbiddenMachines.add(cur.getHost());
             }
         }
 
         //
-        // Now build list of machines we can actually choose from
+        // Build list of machines we can actually choose from
         //
-        long totalRemaining = 0;
+        long latestRemaining = 0;
         Vector targetList = new Vector();
         for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
             DatanodeInfo node = (DatanodeInfo) it.next();
-            if (allowSameHostTargets) {
-                hostOrHostAndPort = node.getName(); // match host:port
-            } else {
-                hostOrHostAndPort = node.getHost(); // match host
-            }
-            if (! forbiddenMachines.contains(hostOrHostAndPort)) {
+            if (! forbiddenMachines.contains(node.getHost())) {
                 targetList.add(node);
-                totalRemaining += node.getRemaining();
+                latestRemaining += node.getRemaining();
             }
         }
 
         //
         // Now pick one
         //
-        if (targetList.size() == 0) {
-            LOG.warning("Zero targets found, forbidden1.size=" +
-                ( forbidden1 != null ? forbidden1.size() : 0 ) +
-                " allowSameHostTargets=" + allowSameHostTargets +
-                " forbidden2.size()=" +
-                ( forbidden2 != null ? forbidden2.size() : 0 ));
-            return null;
-        } else if (! this.useAvailability) {
-            int target = r.nextInt(targetList.size());
-            return (DatanodeInfo) targetList.elementAt(target);
-        } else {
-            // Choose node according to target capacity
-            double target = r.nextDouble() * totalRemaining;
+        if (targetList.size() > 0) {
+            //
+            // If the requester's machine is in the targetList, 
+            // and it's got the capacity, pick it.
+            //
+            if (clientMachine != null && clientMachine.getLength() > 0) {
+                for (Iterator it = targetList.iterator(); it.hasNext(); ) {
+                    DatanodeInfo node = (DatanodeInfo) it.next();
+                    if (clientMachine.equals(node.getHost())) {
+                        if (node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) {
+                            return node;
+                        }
+                    }
+                }
+            }
 
+            //
+            // Otherwise, choose node according to target capacity
+            //
+            double target = Math.abs(r.nextDouble()) * latestRemaining;
             for (Iterator it = targetList.iterator(); it.hasNext(); ) {
                 DatanodeInfo node = (DatanodeInfo) it.next();
                 target -= node.getRemaining();
-                if (target <= 0) {
+                if ((node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) &&
+                    (target <= 0)) {
                     return node;
                 }
             }
 
-            LOG.warning("Impossible state.  When trying to choose target node, could not find any.  This may indicate that datanode capacities are being updated during datanode selection.  Anyway, now returning an arbitrary target to recover...");
-            return (DatanodeInfo) targetList.elementAt(r.nextInt(targetList.size()));
+            //
+            // That should do the trick.  But we might not be able
+            // to pick any node if the target was out of bytes.  As
+            // a last resort, pick the first valid one we can find.
+            //
+            for (Iterator it = targetList.iterator(); it.hasNext(); ) {
+                DatanodeInfo node = (DatanodeInfo) it.next();
+                if (node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) {
+                    return node;
+                }
+            }
+            LOG.warning("Could not find any nodes with sufficient capacity");
+            return null;
+        } else {
+            LOG.warning("Zero targets found, forbidden1.size=" +
+                ( forbidden1 != null ? forbidden1.size() : 0 ) +
+                " forbidden2.size()=" +
+                ( forbidden2 != null ? forbidden2.size() : 0 ));
+            return null;
         }
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=383247&r1=383246&r2=383247&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Sat Mar  4 17:02:33 2006
@@ -131,8 +131,8 @@
 
     /**
      */
-    public LocatedBlock create(String src, String clientName, boolean overwrite) throws IOException {
-        Object results[] = namesystem.startFile(new UTF8(src), new UTF8(clientName), overwrite);
+    public LocatedBlock create(String src, String clientName, String clientMachine, boolean overwrite) throws IOException {
+        Object results[] = namesystem.startFile(new UTF8(src), new UTF8(clientName), new UTF8(clientMachine), overwrite);
         if (results == null) {
             throw new IOException("Cannot create file " + src + " on client " + clientName);
         } else {
@@ -144,15 +144,15 @@
 
     /**
      */
-    public LocatedBlock addBlock(String src) throws IOException {
+    public LocatedBlock addBlock(String src, String clientMachine) throws IOException {
         int retries = 5;
-        Object results[] = namesystem.getAdditionalBlock(new UTF8(src));
+        Object results[] = namesystem.getAdditionalBlock(new UTF8(src), new UTF8(clientMachine));
         while (results != null && results[0] == null && retries > 0) {
             try {
                 Thread.sleep(100);
             } catch (InterruptedException ie) {
             }
-            results = namesystem.getAdditionalBlock(new UTF8(src));
+            results = namesystem.getAdditionalBlock(new UTF8(src), new UTF8(clientMachine));
             retries--;
         }