You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2005/07/19 20:21:31 UTC

svn commit: r219743 - in /lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs: DataNode.java DatanodeProtocol.java FSConstants.java FSDataset.java FSNamesystem.java NameNode.java

Author: mc
Date: Tue Jul 19 11:21:19 2005
New Revision: 219743

URL: http://svn.apache.org/viewcvs?rev=219743&view=rev
Log:

  Fix a few problems with block replication.

  1) Wait until the datanode has been up for X seconds before
it acts as a source of block replications.  We had a timer
at the namenode previously, which is not sufficient

  2) Throttle replications so a single node does not emit more 
than K at a time.

  3) There was a bug where an unexpected error during file 
create would make it seem as if the write was still in progress.  Fixed.


Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java?rev=219743&r1=219742&r2=219743&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java Tue Jul 19 11:21:19 2005
@@ -44,12 +44,6 @@
     //private static long numGigs = NutchConf.get().getLong("ndfs.datanode.maxgigs", 100);
     //
 
-    //
-    // Eventually, this constant should be computed dynamically using 
-    // load information
-    //
-    private static final int MAX_BLOCKS_PER_ROUNDTRIP = 3;
-
     /**
      * Util method to build socket addr from string
      */
@@ -69,6 +63,7 @@
     FSDataset data;
     String localName;
     Vector receivedBlockList = new Vector();
+    int xmitsInProgress = 0;
 
     /**
      * Create using configured defaults and dataDir.
@@ -155,39 +150,49 @@
                     namenode.blockReceived(localName, blockArray);
                 }
 
-                //
-                // Check to see if there are any block-instructions from the
-                // namenode that this datanode should perform.
-                //
-                BlockCommand cmd = namenode.getBlockwork(localName);
-                if (cmd.transferBlocks()) {
-                    //
-                    // Send a copy of a block to another datanode
-                    //
-                    Block blocks[] = cmd.getBlocks();
-                    DatanodeInfo xferTargets[][] = cmd.getTargets();
-
-                    for (int i = 0; i < blocks.length; i++) {
-                        if (!data.isValidBlock(blocks[i])) {
-                            String errStr = "Can't send invalid block " + blocks[i];
-                            LOG.info(errStr);
-                            namenode.errorReport(localName, errStr);
-                            break;
-                        } else {
-                            if (xferTargets[i].length > 0) {
-                                LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
-                                new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
-                            }
-                        }
-                    }
-                } else if (cmd.invalidateBlocks()) {
-                    //
-                    // Some local block(s) are obsolete and can be 
-                    // safely garbage-collected.
-                    //
-                    data.invalidate(cmd.getBlocks());
-                }
-
+		//
+		// Only perform block operations (transfer, delete) after 
+		// a startup quiet period.  The assumption is that all the
+		// datanodes will be started together, but the namenode may
+		// have been started some time before.  (This is esp. true in
+		// the case of network interruptions.)  So, wait for some time
+		// to pass from the time of connection to the first block-transfer.
+		// Otherwise we transfer a lot of blocks unnecessarily.
+		//
+		if (now - sendStart > DATANODE_STARTUP_PERIOD) {
+		    //
+		    // Check to see if there are any block-instructions from the
+		    // namenode that this datanode should perform.
+		    //
+		    BlockCommand cmd = namenode.getBlockwork(localName, xmitsInProgress);
+		    if (cmd.transferBlocks()) {
+			//
+			// Send a copy of a block to another datanode
+			//
+			Block blocks[] = cmd.getBlocks();
+			DatanodeInfo xferTargets[][] = cmd.getTargets();
+			
+			for (int i = 0; i < blocks.length; i++) {
+			    if (!data.isValidBlock(blocks[i])) {
+				String errStr = "Can't send invalid block " + blocks[i];
+				LOG.info(errStr);
+				namenode.errorReport(localName, errStr);
+				break;
+			    } else {
+				if (xferTargets[i].length > 0) {
+				    LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
+				    new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
+				}
+			    }
+			}
+		    } else if (cmd.invalidateBlocks()) {
+			//
+			// Some local block(s) are obsolete and can be 
+			// safely garbage-collected.
+			//
+			data.invalidate(cmd.getBlocks());
+		    }
+		}
 
                 //
                 // There is no work to do;  sleep until hearbeat timer elapses, 
@@ -501,6 +506,7 @@
          * Do the deed, write the bytes
          */
         public void run() {
+	    xmitsInProgress++;
             try {
                 Socket s = new Socket(curTarget.getAddress(), curTarget.getPort());
                 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
@@ -534,9 +540,11 @@
                 } finally {
                     out.close();
                 }
-                LOG.info("Replicated block " + b + " to " + curTarget);
+                LOG.info("Transmitted block " + b + " to " + curTarget);
             } catch (IOException ie) {
-            }
+            } finally {
+		xmitsInProgress--;
+	    }
         }
     }
 
@@ -555,6 +563,7 @@
             try {
                 datanode.offerService();
             } catch (Exception ex) {
+		LOG.info("Exception: " + ex.toString());
                 LOG.info("Lost connection to namenode [" + datanode.getNamenode() + "].  Retrying...");
                 try {
                     Thread.sleep(5000);

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java?rev=219743&r1=219742&r2=219743&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java Tue Jul 19 11:21:19 2005
@@ -32,5 +32,5 @@
     public void blockReceived(String sender, Block blocks[]);
     public void errorReport(String sender, String msg);
 
-    public BlockCommand getBlockwork(String sender);
+    public BlockCommand getBlockwork(String sender, int xmitsInProgress);
 }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java?rev=219743&r1=219742&r2=219743&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java Tue Jul 19 11:21:19 2005
@@ -102,7 +102,6 @@
     public static long EXPIRE_INTERVAL = 4 * HEARTBEAT_INTERVAL;
     public static long BLOCKREPORT_INTERVAL = 9 * 60 * 1000;
     public static long OBSOLETE_INTERVAL = 10 * 60 * 1000;
-    public static long SYSTEM_STARTUP_PERIOD = 15 * 1000;
-    public static long DATANODE_STARTUP_PERIOD = 30 * 1000;
+    public static long DATANODE_STARTUP_PERIOD = 120 * 1000;
     public static long LEASE_PERIOD = 16 * 1000;
 }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java?rev=219743&r1=219742&r2=219743&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java Tue Jul 19 11:21:19 2005
@@ -283,16 +283,21 @@
             reserved += BLOCK_SIZE;
             f = getTmpFile(b);
 
-            if (f.exists()) {
-                throw new IOException("Unexpected problem in startBlock() for " + b + ".  File " + f + " should not be present, but is.");
-            }
-        }
+	    try {
+		if (f.exists()) {
+		    throw new IOException("Unexpected problem in startBlock() for " + b + ".  File " + f + " should not be present, but is.");
+		}
 
-        //
-        // Create the zero-length temp file
-        //
-        if (!f.createNewFile()) {
-            throw new IOException("Unexpected problem in startBlock() for " + b + ".  File " + f + " should be creatable, but is already present.");
+		//
+		// Create the zero-length temp file
+		//
+		if (!f.createNewFile()) {
+		    throw new IOException("Unexpected problem in startBlock() for " + b + ".  File " + f + " should be creatable, but is already present.");
+		}
+	    } catch (IOException ie) {
+		ongoingCreates.remove(b);		
+		reserved -= BLOCK_SIZE;
+	    }
         }
 
         //

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=219743&r1=219742&r2=219743&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java Tue Jul 19 11:21:19 2005
@@ -38,6 +38,9 @@
     final static int DESIRED_REPLICATION =
       NutchConf.get().getInt("ndfs.replication", 3);
 
+    // How many outgoing replication streams a given node should have at one time
+    final static int MAX_REPLICATION_STREAMS = NutchConf.get().getInt("ndfs.max-repl-streams", 2);
+
     // MIN_REPLICATION is how many copies we need in place or else we disallow the write
     final static int MIN_REPLICATION = 1;
 
@@ -990,7 +993,7 @@
      */
     public synchronized Block[] checkObsoleteBlocks(UTF8 name) {
         DatanodeInfo nodeInfo = (DatanodeInfo) datanodeMap.get(name);
-        if (nodeInfo == null || System.currentTimeMillis() - nodeInfo.lastObsoleteCheck() <= OBSOLETE_INTERVAL) {
+        if (System.currentTimeMillis() - nodeInfo.lastObsoleteCheck() <= OBSOLETE_INTERVAL) {
             return null;
         } else {
             nodeInfo.updateObsoleteCheck();
@@ -1017,17 +1020,10 @@
      *     target sequence for the Block at the appropriate index.
      *
      */
-    public synchronized Object[] pendingTransfers(DatanodeInfo srcNode, int maxXfers) {
-        //
-        // Allow the namenode to come up and hear from all datanodes before
-        // making transfers.
-        //
-        if (System.currentTimeMillis() - systemStart < SYSTEM_STARTUP_PERIOD) {
-            return null;
-        }
-
+    public synchronized Object[] pendingTransfers(DatanodeInfo srcNode, int xmitsInProgress) {
         synchronized (neededReplications) {
             Object results[] = null;
+	    int scheduledXfers = 0;
 
             if (neededReplications.size() > 0) {
                 //
@@ -1041,7 +1037,7 @@
                     //
                     // We can only reply with 'maxXfers' or fewer blocks
                     //
-                    if (replicateBlocks.size() >= maxXfers) {
+                    if (scheduledXfers >= MAX_REPLICATION_STREAMS - xmitsInProgress) {
                         break;
                     }
 
@@ -1052,11 +1048,12 @@
                         TreeSet containingNodes = (TreeSet) blocksMap.get(block);
 
                         if (containingNodes.contains(srcNode)) {
-                            DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION - containingNodes.size(), containingNodes);
+                            DatanodeInfo targets[] = chooseTargets(Math.min(DESIRED_REPLICATION - containingNodes.size(), MAX_REPLICATION_STREAMS - xmitsInProgress), containingNodes);
                             if (targets.length > 0) {
                                 // Build items to return
                                 replicateBlocks.add(block);
                                 replicateTargetSets.add(targets);
+				scheduledXfers += targets.length;
                             }
                         }
                     }
@@ -1064,9 +1061,9 @@
 
                 //
                 // Move the block-replication into a "pending" state.
-                // REMIND - mjc - the reason we use 'pending' is so we can retry
+                // The reason we use 'pending' is so we can retry
                 // replications that fail after an appropriate amount of time.  
-                // This is not yet implemented
+                // (REMIND - mjc - this timer is not yet implemented.)
                 //
                 if (replicateBlocks.size() > 0) {
                     int i = 0;
@@ -1079,13 +1076,14 @@
                             neededReplications.remove(block);
                             pendingReplications.add(block);
                         }
+
+			LOG.info("Pending transfer (block " + block.getBlockName() + ") from " + srcNode.getName() + " to " + targets.length + " destinations");
                     }
 
                     //
                     // Build returned objects from above lists
                     //
                     DatanodeInfo targetMatrix[][] = new DatanodeInfo[replicateTargetSets.size()][];
-                    LOG.info("Pending transfer from " + srcNode.getName() + " to " + targetMatrix.length + " destinations");
                     for (i = 0; i < targetMatrix.length; i++) {
                         targetMatrix[i] = (DatanodeInfo[]) replicateTargetSets.elementAt(i);
                     }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java?rev=219743&r1=219742&r2=219743&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Tue Jul 19 11:21:19 2005
@@ -36,12 +36,6 @@
  * @author Mike Cafarella
  **********************************************************/
 public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
-    //
-    // Eventually, this constant should be computed dynamically using 
-    // load information
-    //
-    private static final int MAX_BLOCKS_PER_ROUNDTRIP = 3;
-
     FSNamesystem namesystem;
     Server server;
 
@@ -243,11 +237,11 @@
     /**
      * Return a block-oriented command for the datanode to execute
      */
-    public BlockCommand getBlockwork(String sender) {
+    public BlockCommand getBlockwork(String sender, int xmitsInProgress) {
         //
         // Ask to perform pending transfers, if any
         //
-        Object xferResults[] = namesystem.pendingTransfers(new DatanodeInfo(new UTF8(sender)), MAX_BLOCKS_PER_ROUNDTRIP);
+        Object xferResults[] = namesystem.pendingTransfers(new DatanodeInfo(new UTF8(sender)), xmitsInProgress);
         if (xferResults != null) {
             return new BlockCommand((Block[]) xferResults[0], (DatanodeInfo[][]) xferResults[1]);
         }