You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2005/07/19 20:21:31 UTC
svn commit: r219743 - in
/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs: DataNode.java
DatanodeProtocol.java FSConstants.java FSDataset.java FSNamesystem.java
NameNode.java
Author: mc
Date: Tue Jul 19 11:21:19 2005
New Revision: 219743
URL: http://svn.apache.org/viewcvs?rev=219743&view=rev
Log:
Fix a few problems with block replication.
1) Wait until the datanode has been up for X seconds before
it acts as a source of block replications. We had a timer
at the namenode previously, which is not sufficient
2) Throttle replications so a single node does not emit more
than K at a time.
3) There was a bug where an unexpected error during file
create would make it seem as if the write was still in progress. Fixed.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java?rev=219743&r1=219742&r2=219743&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java Tue Jul 19 11:21:19 2005
@@ -44,12 +44,6 @@
//private static long numGigs = NutchConf.get().getLong("ndfs.datanode.maxgigs", 100);
//
- //
- // Eventually, this constant should be computed dynamically using
- // load information
- //
- private static final int MAX_BLOCKS_PER_ROUNDTRIP = 3;
-
/**
* Util method to build socket addr from string
*/
@@ -69,6 +63,7 @@
FSDataset data;
String localName;
Vector receivedBlockList = new Vector();
+ int xmitsInProgress = 0;
/**
* Create using configured defaults and dataDir.
@@ -155,39 +150,49 @@
namenode.blockReceived(localName, blockArray);
}
- //
- // Check to see if there are any block-instructions from the
- // namenode that this datanode should perform.
- //
- BlockCommand cmd = namenode.getBlockwork(localName);
- if (cmd.transferBlocks()) {
- //
- // Send a copy of a block to another datanode
- //
- Block blocks[] = cmd.getBlocks();
- DatanodeInfo xferTargets[][] = cmd.getTargets();
-
- for (int i = 0; i < blocks.length; i++) {
- if (!data.isValidBlock(blocks[i])) {
- String errStr = "Can't send invalid block " + blocks[i];
- LOG.info(errStr);
- namenode.errorReport(localName, errStr);
- break;
- } else {
- if (xferTargets[i].length > 0) {
- LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
- new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
- }
- }
- }
- } else if (cmd.invalidateBlocks()) {
- //
- // Some local block(s) are obsolete and can be
- // safely garbage-collected.
- //
- data.invalidate(cmd.getBlocks());
- }
-
+ //
+ // Only perform block operations (transfer, delete) after
+ // a startup quiet period. The assumption is that all the
+ // datanodes will be started together, but the namenode may
+ // have been started some time before. (This is esp. true in
+ // the case of network interruptions.) So, wait for some time
+ // to pass from the time of connection to the first block-transfer.
+ // Otherwise we transfer a lot of blocks unnecessarily.
+ //
+ if (now - sendStart > DATANODE_STARTUP_PERIOD) {
+ //
+ // Check to see if there are any block-instructions from the
+ // namenode that this datanode should perform.
+ //
+ BlockCommand cmd = namenode.getBlockwork(localName, xmitsInProgress);
+ if (cmd.transferBlocks()) {
+ //
+ // Send a copy of a block to another datanode
+ //
+ Block blocks[] = cmd.getBlocks();
+ DatanodeInfo xferTargets[][] = cmd.getTargets();
+
+ for (int i = 0; i < blocks.length; i++) {
+ if (!data.isValidBlock(blocks[i])) {
+ String errStr = "Can't send invalid block " + blocks[i];
+ LOG.info(errStr);
+ namenode.errorReport(localName, errStr);
+ break;
+ } else {
+ if (xferTargets[i].length > 0) {
+ LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
+ new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
+ }
+ }
+ }
+ } else if (cmd.invalidateBlocks()) {
+ //
+ // Some local block(s) are obsolete and can be
+ // safely garbage-collected.
+ //
+ data.invalidate(cmd.getBlocks());
+ }
+ }
//
// There is no work to do; sleep until hearbeat timer elapses,
@@ -501,6 +506,7 @@
* Do the deed, write the bytes
*/
public void run() {
+ xmitsInProgress++;
try {
Socket s = new Socket(curTarget.getAddress(), curTarget.getPort());
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
@@ -534,9 +540,11 @@
} finally {
out.close();
}
- LOG.info("Replicated block " + b + " to " + curTarget);
+ LOG.info("Transmitted block " + b + " to " + curTarget);
} catch (IOException ie) {
- }
+ } finally {
+ xmitsInProgress--;
+ }
}
}
@@ -555,6 +563,7 @@
try {
datanode.offerService();
} catch (Exception ex) {
+ LOG.info("Exception: " + ex.toString());
LOG.info("Lost connection to namenode [" + datanode.getNamenode() + "]. Retrying...");
try {
Thread.sleep(5000);
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java?rev=219743&r1=219742&r2=219743&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java Tue Jul 19 11:21:19 2005
@@ -32,5 +32,5 @@
public void blockReceived(String sender, Block blocks[]);
public void errorReport(String sender, String msg);
- public BlockCommand getBlockwork(String sender);
+ public BlockCommand getBlockwork(String sender, int xmitsInProgress);
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java?rev=219743&r1=219742&r2=219743&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java Tue Jul 19 11:21:19 2005
@@ -102,7 +102,6 @@
public static long EXPIRE_INTERVAL = 4 * HEARTBEAT_INTERVAL;
public static long BLOCKREPORT_INTERVAL = 9 * 60 * 1000;
public static long OBSOLETE_INTERVAL = 10 * 60 * 1000;
- public static long SYSTEM_STARTUP_PERIOD = 15 * 1000;
- public static long DATANODE_STARTUP_PERIOD = 30 * 1000;
+ public static long DATANODE_STARTUP_PERIOD = 120 * 1000;
public static long LEASE_PERIOD = 16 * 1000;
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java?rev=219743&r1=219742&r2=219743&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java Tue Jul 19 11:21:19 2005
@@ -283,16 +283,21 @@
reserved += BLOCK_SIZE;
f = getTmpFile(b);
- if (f.exists()) {
- throw new IOException("Unexpected problem in startBlock() for " + b + ". File " + f + " should not be present, but is.");
- }
- }
+ try {
+ if (f.exists()) {
+ throw new IOException("Unexpected problem in startBlock() for " + b + ". File " + f + " should not be present, but is.");
+ }
- //
- // Create the zero-length temp file
- //
- if (!f.createNewFile()) {
- throw new IOException("Unexpected problem in startBlock() for " + b + ". File " + f + " should be creatable, but is already present.");
+ //
+ // Create the zero-length temp file
+ //
+ if (!f.createNewFile()) {
+ throw new IOException("Unexpected problem in startBlock() for " + b + ". File " + f + " should be creatable, but is already present.");
+ }
+ } catch (IOException ie) {
+ ongoingCreates.remove(b);
+ reserved -= BLOCK_SIZE;
+ }
}
//
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=219743&r1=219742&r2=219743&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java Tue Jul 19 11:21:19 2005
@@ -38,6 +38,9 @@
final static int DESIRED_REPLICATION =
NutchConf.get().getInt("ndfs.replication", 3);
+ // How many outgoing replication streams a given node should have at one time
+ final static int MAX_REPLICATION_STREAMS = NutchConf.get().getInt("ndfs.max-repl-streams", 2);
+
// MIN_REPLICATION is how many copies we need in place or else we disallow the write
final static int MIN_REPLICATION = 1;
@@ -990,7 +993,7 @@
*/
public synchronized Block[] checkObsoleteBlocks(UTF8 name) {
DatanodeInfo nodeInfo = (DatanodeInfo) datanodeMap.get(name);
- if (nodeInfo == null || System.currentTimeMillis() - nodeInfo.lastObsoleteCheck() <= OBSOLETE_INTERVAL) {
+ if (System.currentTimeMillis() - nodeInfo.lastObsoleteCheck() <= OBSOLETE_INTERVAL) {
return null;
} else {
nodeInfo.updateObsoleteCheck();
@@ -1017,17 +1020,10 @@
* target sequence for the Block at the appropriate index.
*
*/
- public synchronized Object[] pendingTransfers(DatanodeInfo srcNode, int maxXfers) {
- //
- // Allow the namenode to come up and hear from all datanodes before
- // making transfers.
- //
- if (System.currentTimeMillis() - systemStart < SYSTEM_STARTUP_PERIOD) {
- return null;
- }
-
+ public synchronized Object[] pendingTransfers(DatanodeInfo srcNode, int xmitsInProgress) {
synchronized (neededReplications) {
Object results[] = null;
+ int scheduledXfers = 0;
if (neededReplications.size() > 0) {
//
@@ -1041,7 +1037,7 @@
//
// We can only reply with 'maxXfers' or fewer blocks
//
- if (replicateBlocks.size() >= maxXfers) {
+ if (scheduledXfers >= MAX_REPLICATION_STREAMS - xmitsInProgress) {
break;
}
@@ -1052,11 +1048,12 @@
TreeSet containingNodes = (TreeSet) blocksMap.get(block);
if (containingNodes.contains(srcNode)) {
- DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION - containingNodes.size(), containingNodes);
+ DatanodeInfo targets[] = chooseTargets(Math.min(DESIRED_REPLICATION - containingNodes.size(), MAX_REPLICATION_STREAMS - xmitsInProgress), containingNodes);
if (targets.length > 0) {
// Build items to return
replicateBlocks.add(block);
replicateTargetSets.add(targets);
+ scheduledXfers += targets.length;
}
}
}
@@ -1064,9 +1061,9 @@
//
// Move the block-replication into a "pending" state.
- // REMIND - mjc - the reason we use 'pending' is so we can retry
+ // The reason we use 'pending' is so we can retry
// replications that fail after an appropriate amount of time.
- // This is not yet implemented
+ // (REMIND - mjc - this timer is not yet implemented.)
//
if (replicateBlocks.size() > 0) {
int i = 0;
@@ -1079,13 +1076,14 @@
neededReplications.remove(block);
pendingReplications.add(block);
}
+
+ LOG.info("Pending transfer (block " + block.getBlockName() + ") from " + srcNode.getName() + " to " + targets.length + " destinations");
}
//
// Build returned objects from above lists
//
DatanodeInfo targetMatrix[][] = new DatanodeInfo[replicateTargetSets.size()][];
- LOG.info("Pending transfer from " + srcNode.getName() + " to " + targetMatrix.length + " destinations");
for (i = 0; i < targetMatrix.length; i++) {
targetMatrix[i] = (DatanodeInfo[]) replicateTargetSets.elementAt(i);
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java?rev=219743&r1=219742&r2=219743&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Tue Jul 19 11:21:19 2005
@@ -36,12 +36,6 @@
* @author Mike Cafarella
**********************************************************/
public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
- //
- // Eventually, this constant should be computed dynamically using
- // load information
- //
- private static final int MAX_BLOCKS_PER_ROUNDTRIP = 3;
-
FSNamesystem namesystem;
Server server;
@@ -243,11 +237,11 @@
/**
* Return a block-oriented command for the datanode to execute
*/
- public BlockCommand getBlockwork(String sender) {
+ public BlockCommand getBlockwork(String sender, int xmitsInProgress) {
//
// Ask to perform pending transfers, if any
//
- Object xferResults[] = namesystem.pendingTransfers(new DatanodeInfo(new UTF8(sender)), MAX_BLOCKS_PER_ROUNDTRIP);
+ Object xferResults[] = namesystem.pendingTransfers(new DatanodeInfo(new UTF8(sender)), xmitsInProgress);
if (xferResults != null) {
return new BlockCommand((Block[]) xferResults[0], (DatanodeInfo[][]) xferResults[1]);
}