You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2005/08/26 22:26:40 UTC

svn commit: r240336 - in /lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs: DataNode.java DatanodeInfo.java DatanodeProtocol.java FSConstants.java FSNamesystem.java NameNode.java

Author: mc
Date: Fri Aug 26 13:26:38 2005
New Revision: 240336

URL: http://svn.apache.org/viewcvs?rev=240336&view=rev
Log:

  Change a few things about block-replication.
  1) Merge OBSOLETE and BLOCKREPORT checks.
  2) Adjust constants.  Block reports now happen once an hour
  3) Delete over-replicated blocks.  Over-replication can 
happen when a datanode goes offline, then returns.


Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeInfo.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java?rev=240336&r1=240335&r2=240336&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java Fri Aug 26 13:26:38 2005
@@ -148,14 +148,17 @@
                     // -- Bytes remaining
                     //
                     namenode.sendHeartbeat(localName, data.getCapacity(), data.getRemaining());
-                    LOG.info("Just sent heartbeat, with name " + localName);
+                    //LOG.info("Just sent heartbeat, with name " + localName);
                     lastHeartbeat = now;
 		}
 		if (now - lastBlockReport > blockReportInterval) {
                     //
-                    // Send latest blockinfo report if timer has expired
+                    // Send latest blockinfo report if timer has expired.
+                    // Get back a list of local block(s) that are obsolete
+                    // and can be safely GC'ed.
                     //
-                    namenode.blockReport(localName, data.getBlockReport());
+                    Block toDelete[] = namenode.blockReport(localName, data.getBlockReport());
+                    data.invalidate(toDelete);
                     lastBlockReport = now;
 		}
 		if (receivedBlockList.size() > 0) {
@@ -182,7 +185,7 @@
 		    // namenode that this datanode should perform.
 		    //
 		    BlockCommand cmd = namenode.getBlockwork(localName, xmitsInProgress);
-		    if (cmd.transferBlocks()) {
+		    if (cmd != null && cmd.transferBlocks()) {
 			//
 			// Send a copy of a block to another datanode
 			//
@@ -202,14 +205,14 @@
 				}
 			    }
 			}
-		    } else if (cmd.invalidateBlocks()) {
-			//
-			// Some local block(s) are obsolete and can be 
-			// safely garbage-collected.
-			//
-			data.invalidate(cmd.getBlocks());
-		    }
-		}
+                    } else if (cmd != null && cmd.invalidateBlocks()) {
+                        //
+                        // Some local block(s) are obsolete and can be 
+                        // safely garbage-collected.
+                        //
+                        data.invalidate(cmd.getBlocks());
+                    }
+                }
 
                 //
                 // There is no work to do;  sleep until hearbeat timer elapses, 

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeInfo.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeInfo.java?rev=240336&r1=240335&r2=240336&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeInfo.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeInfo.java Fri Aug 26 13:26:38 2005
@@ -28,7 +28,7 @@
  **************************************************/
 public class DatanodeInfo implements Writable, Comparable {
     UTF8 name;
-    long capacity, remaining, lastUpdate, lastObsoleteCheck;
+    long capacity, remaining, lastUpdate;
     volatile TreeSet blocks;
 
     /**
@@ -41,7 +41,6 @@
         this.name = name;
         int colon = name.toString().indexOf(":");
         this.blocks = new TreeSet();
-        this.lastObsoleteCheck = System.currentTimeMillis();
         updateHeartbeat(0, 0);        
     }
 
@@ -50,7 +49,6 @@
     public DatanodeInfo(UTF8 name, long capacity, long remaining) {
         this.name = name;
         this.blocks = new TreeSet();
-        this.lastObsoleteCheck = System.currentTimeMillis();
         updateHeartbeat(capacity, remaining);
     }
 
@@ -105,12 +103,6 @@
     }
     public long lastUpdate() {
         return lastUpdate;
-    }
-    public void updateObsoleteCheck() {
-        this.lastObsoleteCheck = System.currentTimeMillis();
-    }
-    public long lastObsoleteCheck() {
-        return lastObsoleteCheck;
     }
 
     /////////////////////////////////////////////////

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java?rev=240336&r1=240335&r2=240336&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java Fri Aug 26 13:26:38 2005
@@ -28,7 +28,7 @@
 public interface DatanodeProtocol {
 
     public void sendHeartbeat(String sender, long capacity, long remaining) throws IOException;
-    public void blockReport(String sender, Block blocks[]) throws IOException;
+    public Block[] blockReport(String sender, Block blocks[]) throws IOException;
     public void blockReceived(String sender, Block blocks[]) throws IOException;
     public void errorReport(String sender, String msg) throws IOException;
 

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java?rev=240336&r1=240335&r2=240336&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java Fri Aug 26 13:26:38 2005
@@ -100,8 +100,7 @@
     //
     public static long HEARTBEAT_INTERVAL = 3 * 1000;
     public static long EXPIRE_INTERVAL = 60 * 1000;
-    public static long BLOCKREPORT_INTERVAL = 9 * 60 * 1000;
-    public static long OBSOLETE_INTERVAL = 10 * 60 * 1000;
+    public static long BLOCKREPORT_INTERVAL = 10 * 60 * 1000;
     public static long DATANODE_STARTUP_PERIOD = 120 * 1000;
     public static long LEASE_PERIOD = 16 * 1000;
     public static int READ_TIMEOUT = 20 * 1000;

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=240336&r1=240335&r2=240336&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java Fri Aug 26 13:26:38 2005
@@ -38,6 +38,9 @@
     final static int DESIRED_REPLICATION =
       NutchConf.get().getInt("ndfs.replication", 3);
 
+    // The maximum number of replicates we should allow for a single block
+    final static int MAX_REPLICATION = DESIRED_REPLICATION;
+
     // How many outgoing replication streams a given node should have at one time
     final static int MAX_REPLICATION_STREAMS = NutchConf.get().getInt("ndfs.max-repl-streams", 2);
 
@@ -76,6 +79,13 @@
     TreeMap recentInvalidateSets = new TreeMap();
 
     //
+    // Keeps a TreeSet for every named node.  Each treeset contains
+    // a list of the blocks that are "extra" at that location.  We'll
+    // eventually remove these extras.
+    //
+    TreeMap excessReplicateMap = new TreeMap();
+
+    //
     // Keeps track of files that are being created, plus the
     // blocks that make them up.
     //
@@ -825,7 +835,7 @@
      * The given node is reporting all its blocks.  Use this info to 
      * update the (machine-->blocklist) and (block-->machinelist) tables.
      */
-    public synchronized void processReport(Block newReport[], UTF8 name) {
+    public synchronized Block[] processReport(Block newReport[], UTF8 name) {
         DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);
         if (node == null) {
             throw new IllegalArgumentException("Unexpected exception.  Received block report from node " + name + ", but there is no info for " + name);
@@ -869,6 +879,29 @@
         // Modify node so it has the new blockreport
         //
         node.updateBlocks(newReport);
+
+        //
+        // We've now completely updated the node's block report profile.
+        // We now go through all its blocks and find which ones are invalid,
+        // no longer pending, or over-replicated.
+        //
+        // (Note it's not enough to just invalidate blocks at lease expiry 
+        // time; datanodes can go down before the client's lease on 
+        // the failed file expires and miss the "expire" event.)
+        //
+        // This function considers every block on a datanode, and thus
+        // should only be invoked infrequently.
+        //
+        Vector obsolete = new Vector();
+        for (Iterator it = node.getBlockIterator(); it.hasNext(); ) {
+            Block b = (Block) it.next();
+
+            if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
+                LOG.info("Obsoleting block " + b);
+                obsolete.add(b);
+            }
+        }
+        return (Block[]) obsolete.toArray(new Block[obsolete.size()]);
     }
 
     /**
@@ -889,7 +922,6 @@
 
         synchronized (neededReplications) {
             if (dir.isValidBlock(block)) {
-              //LOG.info("Node " + node + " is reporting stored block " + block);
                 if (containingNodes.size() >= DESIRED_REPLICATION) {
                     neededReplications.remove(block);
                     pendingReplications.remove(block);
@@ -898,11 +930,68 @@
                         neededReplications.add(block);
                     }
                 }
+
+                //
+                // Find how many of the containing nodes are "extra", if any.
+                // If there are any extras, call chooseExcessReplicates() to
+                // mark them in the excessReplicateMap.
+                //
+                Vector nonExcess = new Vector();
+                for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {
+                    DatanodeInfo cur = (DatanodeInfo) it.next();
+                    TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());
+                    if (excessBlocks == null || ! excessBlocks.contains(block)) {
+                        nonExcess.add(cur);
+                    }
+                }
+                if (nonExcess.size() > MAX_REPLICATION) {
+                    chooseExcessReplicates(nonExcess, block, MAX_REPLICATION);    
+                }
             }
         }
     }
 
     /**
+     * We want a max of "maxReps" replicates for any block, but we now have too many.  
+     * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
+     *
+     * srcNodes.size() - dstNodes.size() == maxReps
+     *
+     * For now, we choose nodes randomly.  In the future, we might enforce some
+     * kind of policy (like making sure replicates are spread across racks).
+     */
+    void chooseExcessReplicates(Vector nonExcess, Block b, int maxReps) {
+        while (nonExcess.size() - maxReps > 0) {
+            int chosenNode = r.nextInt(nonExcess.size());
+            DatanodeInfo cur = (DatanodeInfo) nonExcess.elementAt(chosenNode);
+            nonExcess.removeElementAt(chosenNode);
+
+            TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());
+            if (excessBlocks == null) {
+                excessBlocks = new TreeSet();
+                excessReplicateMap.put(cur.getName(), excessBlocks);
+            }
+            excessBlocks.add(b);
+
+            //
+            // The 'excessblocks' tracks blocks until we get confirmation
+            // that the datanode has deleted them; the only way we remove them
+            // is when we get a "removeBlock" message.  
+            //
+            // The 'invalidate' list is used to inform the datanode the block 
+            // should be deleted.  Items are removed from the invalidate list
+            // upon giving instructions to the namenode.
+            //
+            Vector invalidateSet = (Vector) recentInvalidateSets.get(cur.getName());
+            if (invalidateSet == null) {
+                invalidateSet = new Vector();
+                recentInvalidateSets.put(cur.getName(), invalidateSet);
+            }
+            invalidateSet.add(b);
+        }
+    }
+
+    /**
      * Modify (block-->datanode) map.  Possibly generate 
      * replication tasks, if the removed block is still valid.
      */
@@ -924,6 +1013,18 @@
                 neededReplications.add(block);
             }
         }
+
+        //
+        // We've removed a block from a node, so it's definitely no longer
+        // in "excess" there.
+        //
+        TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(node.getName());
+        if (excessBlocks != null) {
+            excessBlocks.remove(block);
+            if (excessBlocks.size() == 0) {
+                excessReplicateMap.remove(node.getName());
+            }
+        }
     }
 
     /**
@@ -984,49 +1085,14 @@
     /////////////////////////////////////////////////////////
 
     /**
-     * Return with a list of Blocks that should be invalidated
-     * at the given node.  Done in response to a file delete, which 
-     * eliminates a number of blocks from the universe.
-     */
-    public synchronized Block[] recentlyInvalidBlocks(UTF8 name) {
-        Vector invalidateSet = (Vector) recentInvalidateSets.remove(name);
-        if (invalidateSet == null) {
-            return null;
-        } else {
-            return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]);
-        }
-    }
-
-    /**
-     * If the node has not been checked in some time, go through
-     * its blocks and find which ones are neither valid nor pending.
-     * It often happens that a client will start writing blocks and
-     * then exit.  The blocks are on-disk, but the file will be 
-     * abandoned.
-     *
-     * It's not enough to invalidate blocks at lease expiry time;
-     * datanodes can go down before the client's lease on 
-     * the failed file expires and miss the "expire" event.
-     *
-     * This function considers every block on a datanode, and thus
-     * should only be invoked infrequently.
+     * Check if there are any recently-deleted blocks a datanode should remove.
      */
-    public synchronized Block[] checkObsoleteBlocks(UTF8 name) {
-        DatanodeInfo nodeInfo = (DatanodeInfo) datanodeMap.get(name);
-        if (System.currentTimeMillis() - nodeInfo.lastObsoleteCheck() <= OBSOLETE_INTERVAL) {
-            return null;
+    public synchronized Block[] blocksToInvalidate(UTF8 sender) {
+        Vector invalidateSet = (Vector) recentInvalidateSets.remove(sender);
+        if (invalidateSet != null) {
+            return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]);
         } else {
-            nodeInfo.updateObsoleteCheck();
-            Vector obsolete = new Vector();
-            for (Iterator it = nodeInfo.getBlockIterator(); it.hasNext(); ) {
-                Block b = (Block) it.next();
-
-                if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
-                    LOG.info("Obsoleting block " + b);
-                    obsolete.add(b);
-                }
-            }
-            return (Block[]) obsolete.toArray(new Block[obsolete.size()]);
+            return null;
         }
     }
 
@@ -1066,7 +1132,6 @@
                         it.remove();
                     } else {
                         TreeSet containingNodes = (TreeSet) blocksMap.get(block);
-
                         if (containingNodes.contains(srcNode)) {
                             DatanodeInfo targets[] = chooseTargets(Math.min(DESIRED_REPLICATION - containingNodes.size(), MAX_REPLICATION_STREAMS - xmitsInProgress), containingNodes);
                             if (targets.length > 0) {
@@ -1166,13 +1231,13 @@
         if (forbidden1 != null) {
             for (Iterator it = forbidden1.iterator(); it.hasNext(); ) {
                 DatanodeInfo cur = (DatanodeInfo) it.next();
-                forbiddenMachines.add(cur.getHost());
+                forbiddenMachines.add(cur.getName());
             }
         }
         if (forbidden2 != null) {
             for (Iterator it = forbidden2.iterator(); it.hasNext(); ) {
                 DatanodeInfo cur = (DatanodeInfo) it.next();
-                forbiddenMachines.add(cur.getHost());
+                forbiddenMachines.add(cur.getName());
             }
         }
 
@@ -1185,7 +1250,7 @@
             DatanodeInfo node = (DatanodeInfo) it.next();
             if ((forbidden1 == null || ! forbidden1.contains(node)) &&
                 (forbidden2 == null || ! forbidden2.contains(node)) &&
-                (! forbiddenMachines.contains(node.getHost()))) {
+                (! forbiddenMachines.contains(node.getName()))) {
                 targetList.add(node);
                 totalRemaining += node.getRemaining();
             }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java?rev=240336&r1=240335&r2=240336&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Fri Aug 26 13:26:38 2005
@@ -262,9 +262,9 @@
         namesystem.gotHeartbeat(new UTF8(sender), capacity, remaining);
     }
 
-    public void blockReport(String sender, Block blocks[]) {
+    public Block[] blockReport(String sender, Block blocks[]) {
         LOG.info("Block report from "+sender+": "+blocks.length+" blocks.");
-        namesystem.processReport(blocks, new UTF8(sender));
+        return namesystem.processReport(blocks, new UTF8(sender));
     }
 
     public void blockReceived(String sender, Block blocks[]) {
@@ -281,7 +281,8 @@
     }
 
     /**
-     * Return a block-oriented command for the datanode to execute
+     * Return a block-oriented command for the datanode to execute.
+     * This will be either a transfer or a delete operation.
      */
     public BlockCommand getBlockwork(String sender, int xmitsInProgress) {
         //
@@ -293,17 +294,16 @@
         }
 
         //
-        // If none, check to see if there are blocks to invalidate
+        // If there are no transfers, check for recently-deleted blocks that
+        // should be removed.  This is not a full-datanode sweep, as is done during
+        // a block report.  This is just a small fast removal of blocks that have
+        // just been removed.
         //
-        Block blocks[] = namesystem.recentlyInvalidBlocks(new UTF8(sender));
-        if (blocks == null) {
-            blocks = namesystem.checkObsoleteBlocks(new UTF8(sender));
-        }
+        Block blocks[] = namesystem.blocksToInvalidate(new UTF8(sender));
         if (blocks != null) {
             return new BlockCommand(blocks);
         }
-
-        return new BlockCommand();
+        return null;
     }
 
     /**