You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/01 21:45:26 UTC

svn commit: r398668 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/DataNode.java src/java/org/apache/hadoop/dfs/DatanodeProtocol.java src/java/org/apache/hadoop/dfs/NameNode.java

Author: cutting
Date: Mon May  1 12:45:24 2006
New Revision: 398668

URL: http://svn.apache.org/viewcvs?rev=398668&view=rev
Log:
HADOOP-178.  Piggyback DFS blockwork requests on heartbeat responses, reducing traffic.  Also move blockwork delay on startup from datanode to namenode, fixing a problems when the namenode alone restarts.  Contributed by Hairong Kuang.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=398668&r1=398667&r2=398668&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon May  1 12:45:24 2006
@@ -132,6 +132,12 @@
     Previously, when jobs had more than a few thousand tasks they
     could crash web browsers.  (Mahadev Konar via cutting)
 
+36. HADOOP-178.  In DFS, piggyback blockwork requests from datanodes
+    on heartbeat responses from namenode.  This reduces the volume of
+    RPC traffic.  Also move startup delay in blockwork from datanode
+    to namenode.  This fixes a problem where restarting the namenode
+    triggered a lot of uneeded replication. (Hairong Kuang via cutting)
+
 
 Release 0.1.1 - 2006-04-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=398668&r1=398667&r2=398668&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon May  1 12:45:24 2006
@@ -89,7 +89,6 @@
     int xmitsInProgress = 0;
     Daemon dataXceiveServer = null;
     long blockReportInterval;
-    private long datanodeStartupPeriod;
 
     /**
      * Create the DataNode given a configuration and a dataDir.
@@ -128,8 +127,6 @@
           conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
         this.blockReportInterval =
           blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
-        this.datanodeStartupPeriod =
-          conf.getLong("dfs.datanode.startupMsec", DATANODE_STARTUP_PERIOD);
     }
 
     /**
@@ -159,7 +156,6 @@
      */
     public void offerService() throws Exception {
         long lastHeartbeat = 0, lastBlockReport = 0;
-        long sendStart = System.currentTimeMillis();
         LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
 
         //
@@ -171,100 +167,91 @@
             //
             // Every so often, send heartbeat or block-report
             //
-            synchronized (receivedBlockList) {
-                if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {
+            if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {
+                //
+                // All heartbeat messages include following info:
+                // -- Datanode name
+                // -- data transfer port
+                // -- Total capacity
+                // -- Bytes remaining
+                //
+                BlockCommand cmd = namenode.sendHeartbeat(localName, 
+                        data.getCapacity(), data.getRemaining(), xmitsInProgress);
+                //LOG.info("Just sent heartbeat, with name " + localName);
+                lastHeartbeat = now;
+
+                if (cmd != null && cmd.transferBlocks()) {
+                    //
+                    // Send a copy of a block to another datanode
+                    //
+                    Block blocks[] = cmd.getBlocks();
+                    DatanodeInfo xferTargets[][] = cmd.getTargets();
+                        
+                    for (int i = 0; i < blocks.length; i++) {
+                        if (!data.isValidBlock(blocks[i])) {
+                            String errStr = "Can't send invalid block " + blocks[i];
+                            LOG.info(errStr);
+                            namenode.errorReport(localName, errStr);
+                            break;
+                        } else {
+                            if (xferTargets[i].length > 0) {
+                                LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
+                                new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
+                            }
+                        }
+                    }
+                } else if (cmd != null && cmd.invalidateBlocks()) {
                     //
-                    // All heartbeat messages include following info:
-                    // -- Datanode name
-                    // -- data transfer port
-                    // -- Total capacity
-                    // -- Bytes remaining
-                    //
-                    namenode.sendHeartbeat(localName, data.getCapacity(), data.getRemaining());
-                    //LOG.info("Just sent heartbeat, with name " + localName);
-                    lastHeartbeat = now;
-		}
-		if (now - lastBlockReport > blockReportInterval) {
-                    //
-                    // Send latest blockinfo report if timer has expired.
-                    // Get back a list of local block(s) that are obsolete
-                    // and can be safely GC'ed.
-                    //
-                    Block toDelete[] = namenode.blockReport(localName, data.getBlockReport());
-                    data.invalidate(toDelete);
-                    lastBlockReport = now;
-                    continue;
-		}
-		if (receivedBlockList.size() > 0) {
+                    // Some local block(s) are obsolete and can be 
+                    // safely garbage-collected.
+                    //
+                    data.invalidate(cmd.getBlocks());
+                }
+            }
+            
+            // send block report
+            if (now - lastBlockReport > blockReportInterval) {
+                //
+                // Send latest blockinfo report if timer has expired.
+                // Get back a list of local block(s) that are obsolete
+                // and can be safely GC'ed.
+                //
+                Block toDelete[] = namenode.blockReport(localName, data.getBlockReport());
+                data.invalidate(toDelete);
+                lastBlockReport = now;
+                continue;
+            }
+            
+            // check if there are newly received blocks
+            Block [] blockArray=null;
+            synchronized( receivedBlockList ) {
+                if (receivedBlockList.size() > 0) {
                     //
                     // Send newly-received blockids to namenode
                     //
-                    Block blockArray[] = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);
+                    blockArray = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);
                     receivedBlockList.removeAllElements();
-                    namenode.blockReceived(localName, blockArray);
-                }
-
-		//
-		// Only perform block operations (transfer, delete) after 
-		// a startup quiet period.  The assumption is that all the
-		// datanodes will be started together, but the namenode may
-		// have been started some time before.  (This is esp. true in
-		// the case of network interruptions.)  So, wait for some time
-		// to pass from the time of connection to the first block-transfer.
-		// Otherwise we transfer a lot of blocks unnecessarily.
-		//
-		if (now - sendStart > datanodeStartupPeriod) {
-		    //
-		    // Check to see if there are any block-instructions from the
-		    // namenode that this datanode should perform.
-		    //
-		    BlockCommand cmd = namenode.getBlockwork(localName, xmitsInProgress);
-		    if (cmd != null && cmd.transferBlocks()) {
-			//
-			// Send a copy of a block to another datanode
-			//
-			Block blocks[] = cmd.getBlocks();
-			DatanodeInfo xferTargets[][] = cmd.getTargets();
-			
-			for (int i = 0; i < blocks.length; i++) {
-			    if (!data.isValidBlock(blocks[i])) {
-				String errStr = "Can't send invalid block " + blocks[i];
-				LOG.info(errStr);
-				namenode.errorReport(localName, errStr);
-				break;
-			    } else {
-				if (xferTargets[i].length > 0) {
-				    LOG.info("Starting thread to transfer block " + blocks[i] 
-                   + " to " + xferTargets[i][0].getName()
-                   + (xferTargets[i].length > 1 ? " and " 
-                   + (xferTargets[i].length-1) + " more destination(s)" : "" ));
-				    new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
-				}
-			    }
-			}
-                    } else if (cmd != null && cmd.invalidateBlocks()) {
-                        //
-                        // Some local block(s) are obsolete and can be 
-                        // safely garbage-collected.
-                        //
-                        data.invalidate(cmd.getBlocks());
-                    }
                 }
-
-                //
-                // There is no work to do;  sleep until hearbeat timer elapses, 
-                // or work arrives, and then iterate again.
-                //
-                long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
+            }
+            if( blockArray != null ) {
+                namenode.blockReceived(localName, blockArray);
+            }
+            
+            //
+            // There is no work to do;  sleep until hearbeat timer elapses, 
+            // or work arrives, and then iterate again.
+            //
+            long waitTime = HEARTBEAT_INTERVAL - (System.currentTimeMillis() - lastHeartbeat);
+            synchronized( receivedBlockList ) {
                 if (waitTime > 0 && receivedBlockList.size() == 0) {
                     try {
                         receivedBlockList.wait(waitTime);
                     } catch (InterruptedException ie) {
                     }
                 }
-            }
-        }
-    }
+            } // synchronized
+        } // while (shouldRun)
+    } // offerService
 
     /**
      * Server used for receiving/sending a block of data.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?rev=398668&r1=398667&r2=398668&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Mon May  1 12:45:24 2006
@@ -30,9 +30,14 @@
 interface DatanodeProtocol {
     /**
      * sendHeartbeat() tells the NameNode that the DataNode is still
-     * alive and well.  Includes some status info, too.
+     * alive and well.  Includes some status info, too. 
+     * It also gives the NameNode a chance to return a "BlockCommand" object.
+     * A BlockCommand tells the DataNode to invalidate local block(s), 
+     * or to copy them to other DataNodes, etc.
      */
-    public void sendHeartbeat(String sender, long capacity, long remaining) throws IOException;
+    public BlockCommand sendHeartbeat(String sender, 
+            long capacity, long remaining,
+            int xmitsInProgress) throws IOException;
 
     /**
      * blockReport() tells the NameNode about all the locally-stored blocks.
@@ -56,13 +61,4 @@
      * awry.  Useful for debugging.
      */
     public void errorReport(String sender, String msg) throws IOException;
-
-    /**
-     * The DataNode periodically calls getBlockwork().  It includes a
-     * small amount of status information, but mainly gives the NameNode
-     * a chance to return a "BlockCommand" object.  A BlockCommand tells
-     * the DataNode to invalidate local block(s), or to copy them to other 
-     * DataNodes, etc.
-     */
-    public BlockCommand getBlockwork(String sender, int xmitsInProgress) throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=398668&r1=398667&r2=398668&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon May  1 12:45:24 2006
@@ -61,7 +61,9 @@
     private FSNamesystem namesystem;
     private Server server;
     private int handlerCount = 2;
-
+    private long datanodeStartupPeriod;
+    private volatile long firstBlockReportTime;
+    
     /** only used for testing purposes  */
     private boolean stopRequested = false;
 
@@ -83,10 +85,12 @@
     /**
      * Create a NameNode at the specified location and start it.
      */
-    public NameNode(File dir, int port, Configuration conf) throws IOException {
+    public NameNode(File dir, int port, Configuration conf) throws IOException {     
         this.namesystem = new FSNamesystem(dir, conf);
         this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
         this.server = RPC.getServer(this, port, handlerCount, false, conf);
+        this.datanodeStartupPeriod =
+            conf.getLong("dfs.datanode.startupMsec", DATANODE_STARTUP_PERIOD);
         this.server.start();
     }
 
@@ -353,34 +357,31 @@
     // DatanodeProtocol
     ////////////////////////////////////////////////////////////////
     /**
-     */
-    public void sendHeartbeat(String sender, long capacity, long remaining) {
-        namesystem.gotHeartbeat(new UTF8(sender), capacity, remaining);
-    }
-
-    public Block[] blockReport(String sender, Block blocks[]) {
-        LOG.info("Block report from "+sender+": "+blocks.length+" blocks.");
-        return namesystem.processReport(blocks, new UTF8(sender));
-    }
-
-    public void blockReceived(String sender, Block blocks[]) {
-        for (int i = 0; i < blocks.length; i++) {
-            namesystem.blockReceived(blocks[i], new UTF8(sender));
-        }
-    }
-
-    /**
-     */
-    public void errorReport(String sender, String msg) {
-        // Log error message from datanode
-        //LOG.info("Report from " + sender + ": " + msg);
-    }
-
-    /**
+     * Data node notify the name node that it is alive 
      * Return a block-oriented command for the datanode to execute.
      * This will be either a transfer or a delete operation.
      */
-    public BlockCommand getBlockwork(String sender, int xmitsInProgress) {
+    public BlockCommand sendHeartbeat(String sender, long capacity, long remaining,
+            int xmitsInProgress) {
+        namesystem.gotHeartbeat(new UTF8(sender), capacity, remaining);        
+        
+        //
+        // Only ask datanodes to perform block operations (transfer, delete) 
+        // after a startup quiet period.  The assumption is that all the
+        // datanodes will be started together, but the namenode may
+        // have been started some time before.  (This is esp. true in
+        // the case of network interruptions.)  So, wait for some time
+        // to pass from the time of connection to the first block-transfer.
+        // Otherwise we transfer a lot of blocks unnecessarily.
+        //
+        // Hairong: Ideally in addition we also look at the history. For example,
+        // we should wait until at least 98% of datanodes are connected to the server
+        //
+        if( firstBlockReportTime==0 ||
+            System.currentTimeMillis()-firstBlockReportTime < datanodeStartupPeriod) {
+            return null;
+        }
+        
         //
         // Ask to perform pending transfers, if any
         //
@@ -400,6 +401,26 @@
             return new BlockCommand(blocks);
         }
         return null;
+    }
+
+    public Block[] blockReport(String sender, Block blocks[]) {
+        if( firstBlockReportTime==0)
+              firstBlockReportTime=System.currentTimeMillis();
+
+        return namesystem.processReport(blocks, new UTF8(sender));
+     }
+
+    public void blockReceived(String sender, Block blocks[]) {
+        for (int i = 0; i < blocks.length; i++) {
+            namesystem.blockReceived(blocks[i], new UTF8(sender));
+        }
+    }
+
+    /**
+     */
+    public void errorReport(String sender, String msg) {
+        // Log error message from datanode
+        //LOG.info("Report from " + sender + ": " + msg);
     }
 
     /**