You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/01 21:45:26 UTC
svn commit: r398668 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/dfs/DataNode.java
src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
src/java/org/apache/hadoop/dfs/NameNode.java
Author: cutting
Date: Mon May 1 12:45:24 2006
New Revision: 398668
URL: http://svn.apache.org/viewcvs?rev=398668&view=rev
Log:
HADOOP-178. Piggyback DFS blockwork requests on heartbeat responses, reducing traffic. Also move blockwork delay on startup from datanode to namenode, fixing a problems when the namenode alone restarts. Contributed by Hairong Kuang.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=398668&r1=398667&r2=398668&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon May 1 12:45:24 2006
@@ -132,6 +132,12 @@
Previously, when jobs had more than a few thousand tasks they
could crash web browsers. (Mahadev Konar via cutting)
+36. HADOOP-178. In DFS, piggyback blockwork requests from datanodes
+ on heartbeat responses from namenode. This reduces the volume of
+ RPC traffic. Also move startup delay in blockwork from datanode
+ to namenode. This fixes a problem where restarting the namenode
+ triggered a lot of uneeded replication. (Hairong Kuang via cutting)
+
Release 0.1.1 - 2006-04-08
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=398668&r1=398667&r2=398668&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon May 1 12:45:24 2006
@@ -89,7 +89,6 @@
int xmitsInProgress = 0;
Daemon dataXceiveServer = null;
long blockReportInterval;
- private long datanodeStartupPeriod;
/**
* Create the DataNode given a configuration and a dataDir.
@@ -128,8 +127,6 @@
conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
this.blockReportInterval =
blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
- this.datanodeStartupPeriod =
- conf.getLong("dfs.datanode.startupMsec", DATANODE_STARTUP_PERIOD);
}
/**
@@ -159,7 +156,6 @@
*/
public void offerService() throws Exception {
long lastHeartbeat = 0, lastBlockReport = 0;
- long sendStart = System.currentTimeMillis();
LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
//
@@ -171,100 +167,91 @@
//
// Every so often, send heartbeat or block-report
//
- synchronized (receivedBlockList) {
- if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {
+ if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {
+ //
+ // All heartbeat messages include following info:
+ // -- Datanode name
+ // -- data transfer port
+ // -- Total capacity
+ // -- Bytes remaining
+ //
+ BlockCommand cmd = namenode.sendHeartbeat(localName,
+ data.getCapacity(), data.getRemaining(), xmitsInProgress);
+ //LOG.info("Just sent heartbeat, with name " + localName);
+ lastHeartbeat = now;
+
+ if (cmd != null && cmd.transferBlocks()) {
+ //
+ // Send a copy of a block to another datanode
+ //
+ Block blocks[] = cmd.getBlocks();
+ DatanodeInfo xferTargets[][] = cmd.getTargets();
+
+ for (int i = 0; i < blocks.length; i++) {
+ if (!data.isValidBlock(blocks[i])) {
+ String errStr = "Can't send invalid block " + blocks[i];
+ LOG.info(errStr);
+ namenode.errorReport(localName, errStr);
+ break;
+ } else {
+ if (xferTargets[i].length > 0) {
+ LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
+ new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
+ }
+ }
+ }
+ } else if (cmd != null && cmd.invalidateBlocks()) {
//
- // All heartbeat messages include following info:
- // -- Datanode name
- // -- data transfer port
- // -- Total capacity
- // -- Bytes remaining
- //
- namenode.sendHeartbeat(localName, data.getCapacity(), data.getRemaining());
- //LOG.info("Just sent heartbeat, with name " + localName);
- lastHeartbeat = now;
- }
- if (now - lastBlockReport > blockReportInterval) {
- //
- // Send latest blockinfo report if timer has expired.
- // Get back a list of local block(s) that are obsolete
- // and can be safely GC'ed.
- //
- Block toDelete[] = namenode.blockReport(localName, data.getBlockReport());
- data.invalidate(toDelete);
- lastBlockReport = now;
- continue;
- }
- if (receivedBlockList.size() > 0) {
+ // Some local block(s) are obsolete and can be
+ // safely garbage-collected.
+ //
+ data.invalidate(cmd.getBlocks());
+ }
+ }
+
+ // send block report
+ if (now - lastBlockReport > blockReportInterval) {
+ //
+ // Send latest blockinfo report if timer has expired.
+ // Get back a list of local block(s) that are obsolete
+ // and can be safely GC'ed.
+ //
+ Block toDelete[] = namenode.blockReport(localName, data.getBlockReport());
+ data.invalidate(toDelete);
+ lastBlockReport = now;
+ continue;
+ }
+
+ // check if there are newly received blocks
+ Block [] blockArray=null;
+ synchronized( receivedBlockList ) {
+ if (receivedBlockList.size() > 0) {
//
// Send newly-received blockids to namenode
//
- Block blockArray[] = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);
+ blockArray = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);
receivedBlockList.removeAllElements();
- namenode.blockReceived(localName, blockArray);
- }
-
- //
- // Only perform block operations (transfer, delete) after
- // a startup quiet period. The assumption is that all the
- // datanodes will be started together, but the namenode may
- // have been started some time before. (This is esp. true in
- // the case of network interruptions.) So, wait for some time
- // to pass from the time of connection to the first block-transfer.
- // Otherwise we transfer a lot of blocks unnecessarily.
- //
- if (now - sendStart > datanodeStartupPeriod) {
- //
- // Check to see if there are any block-instructions from the
- // namenode that this datanode should perform.
- //
- BlockCommand cmd = namenode.getBlockwork(localName, xmitsInProgress);
- if (cmd != null && cmd.transferBlocks()) {
- //
- // Send a copy of a block to another datanode
- //
- Block blocks[] = cmd.getBlocks();
- DatanodeInfo xferTargets[][] = cmd.getTargets();
-
- for (int i = 0; i < blocks.length; i++) {
- if (!data.isValidBlock(blocks[i])) {
- String errStr = "Can't send invalid block " + blocks[i];
- LOG.info(errStr);
- namenode.errorReport(localName, errStr);
- break;
- } else {
- if (xferTargets[i].length > 0) {
- LOG.info("Starting thread to transfer block " + blocks[i]
- + " to " + xferTargets[i][0].getName()
- + (xferTargets[i].length > 1 ? " and "
- + (xferTargets[i].length-1) + " more destination(s)" : "" ));
- new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
- }
- }
- }
- } else if (cmd != null && cmd.invalidateBlocks()) {
- //
- // Some local block(s) are obsolete and can be
- // safely garbage-collected.
- //
- data.invalidate(cmd.getBlocks());
- }
}
-
- //
- // There is no work to do; sleep until hearbeat timer elapses,
- // or work arrives, and then iterate again.
- //
- long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
+ }
+ if( blockArray != null ) {
+ namenode.blockReceived(localName, blockArray);
+ }
+
+ //
+ // There is no work to do; sleep until hearbeat timer elapses,
+ // or work arrives, and then iterate again.
+ //
+ long waitTime = HEARTBEAT_INTERVAL - (System.currentTimeMillis() - lastHeartbeat);
+ synchronized( receivedBlockList ) {
if (waitTime > 0 && receivedBlockList.size() == 0) {
try {
receivedBlockList.wait(waitTime);
} catch (InterruptedException ie) {
}
}
- }
- }
- }
+ } // synchronized
+ } // while (shouldRun)
+ } // offerService
/**
* Server used for receiving/sending a block of data.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?rev=398668&r1=398667&r2=398668&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Mon May 1 12:45:24 2006
@@ -30,9 +30,14 @@
interface DatanodeProtocol {
/**
* sendHeartbeat() tells the NameNode that the DataNode is still
- * alive and well. Includes some status info, too.
+ * alive and well. Includes some status info, too.
+ * It also gives the NameNode a chance to return a "BlockCommand" object.
+ * A BlockCommand tells the DataNode to invalidate local block(s),
+ * or to copy them to other DataNodes, etc.
*/
- public void sendHeartbeat(String sender, long capacity, long remaining) throws IOException;
+ public BlockCommand sendHeartbeat(String sender,
+ long capacity, long remaining,
+ int xmitsInProgress) throws IOException;
/**
* blockReport() tells the NameNode about all the locally-stored blocks.
@@ -56,13 +61,4 @@
* awry. Useful for debugging.
*/
public void errorReport(String sender, String msg) throws IOException;
-
- /**
- * The DataNode periodically calls getBlockwork(). It includes a
- * small amount of status information, but mainly gives the NameNode
- * a chance to return a "BlockCommand" object. A BlockCommand tells
- * the DataNode to invalidate local block(s), or to copy them to other
- * DataNodes, etc.
- */
- public BlockCommand getBlockwork(String sender, int xmitsInProgress) throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=398668&r1=398667&r2=398668&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon May 1 12:45:24 2006
@@ -61,7 +61,9 @@
private FSNamesystem namesystem;
private Server server;
private int handlerCount = 2;
-
+ private long datanodeStartupPeriod;
+ private volatile long firstBlockReportTime;
+
/** only used for testing purposes */
private boolean stopRequested = false;
@@ -83,10 +85,12 @@
/**
* Create a NameNode at the specified location and start it.
*/
- public NameNode(File dir, int port, Configuration conf) throws IOException {
+ public NameNode(File dir, int port, Configuration conf) throws IOException {
this.namesystem = new FSNamesystem(dir, conf);
this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
this.server = RPC.getServer(this, port, handlerCount, false, conf);
+ this.datanodeStartupPeriod =
+ conf.getLong("dfs.datanode.startupMsec", DATANODE_STARTUP_PERIOD);
this.server.start();
}
@@ -353,34 +357,31 @@
// DatanodeProtocol
////////////////////////////////////////////////////////////////
/**
- */
- public void sendHeartbeat(String sender, long capacity, long remaining) {
- namesystem.gotHeartbeat(new UTF8(sender), capacity, remaining);
- }
-
- public Block[] blockReport(String sender, Block blocks[]) {
- LOG.info("Block report from "+sender+": "+blocks.length+" blocks.");
- return namesystem.processReport(blocks, new UTF8(sender));
- }
-
- public void blockReceived(String sender, Block blocks[]) {
- for (int i = 0; i < blocks.length; i++) {
- namesystem.blockReceived(blocks[i], new UTF8(sender));
- }
- }
-
- /**
- */
- public void errorReport(String sender, String msg) {
- // Log error message from datanode
- //LOG.info("Report from " + sender + ": " + msg);
- }
-
- /**
+ * Data node notify the name node that it is alive
* Return a block-oriented command for the datanode to execute.
* This will be either a transfer or a delete operation.
*/
- public BlockCommand getBlockwork(String sender, int xmitsInProgress) {
+ public BlockCommand sendHeartbeat(String sender, long capacity, long remaining,
+ int xmitsInProgress) {
+ namesystem.gotHeartbeat(new UTF8(sender), capacity, remaining);
+
+ //
+ // Only ask datanodes to perform block operations (transfer, delete)
+ // after a startup quiet period. The assumption is that all the
+ // datanodes will be started together, but the namenode may
+ // have been started some time before. (This is esp. true in
+ // the case of network interruptions.) So, wait for some time
+ // to pass from the time of connection to the first block-transfer.
+ // Otherwise we transfer a lot of blocks unnecessarily.
+ //
+ // Hairong: Ideally in addition we also look at the history. For example,
+ // we should wait until at least 98% of datanodes are connected to the server
+ //
+ if( firstBlockReportTime==0 ||
+ System.currentTimeMillis()-firstBlockReportTime < datanodeStartupPeriod) {
+ return null;
+ }
+
//
// Ask to perform pending transfers, if any
//
@@ -400,6 +401,26 @@
return new BlockCommand(blocks);
}
return null;
+ }
+
+ public Block[] blockReport(String sender, Block blocks[]) {
+ if( firstBlockReportTime==0)
+ firstBlockReportTime=System.currentTimeMillis();
+
+ return namesystem.processReport(blocks, new UTF8(sender));
+ }
+
+ public void blockReceived(String sender, Block blocks[]) {
+ for (int i = 0; i < blocks.length; i++) {
+ namesystem.blockReceived(blocks[i], new UTF8(sender));
+ }
+ }
+
+ /**
+ */
+ public void errorReport(String sender, String msg) {
+ // Log error message from datanode
+ //LOG.info("Report from " + sender + ": " + msg);
}
/**