You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2005/08/26 22:26:40 UTC
svn commit: r240336 - in
/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs: DataNode.java
DatanodeInfo.java DatanodeProtocol.java FSConstants.java FSNamesystem.java
NameNode.java
Author: mc
Date: Fri Aug 26 13:26:38 2005
New Revision: 240336
URL: http://svn.apache.org/viewcvs?rev=240336&view=rev
Log:
Change a few things about block-replication.
1) Merge OBSOLETE and BLOCKREPORT checks.
2) Adjust constants. Block reports now happen once an hour
3) Delete over-replicated blocks. Over-replication can
happen when a datanode goes offline, then returns.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeInfo.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java?rev=240336&r1=240335&r2=240336&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java Fri Aug 26 13:26:38 2005
@@ -148,14 +148,17 @@
// -- Bytes remaining
//
namenode.sendHeartbeat(localName, data.getCapacity(), data.getRemaining());
- LOG.info("Just sent heartbeat, with name " + localName);
+ //LOG.info("Just sent heartbeat, with name " + localName);
lastHeartbeat = now;
}
if (now - lastBlockReport > blockReportInterval) {
//
- // Send latest blockinfo report if timer has expired
+ // Send latest blockinfo report if timer has expired.
+ // Get back a list of local block(s) that are obsolete
+ // and can be safely GC'ed.
//
- namenode.blockReport(localName, data.getBlockReport());
+ Block toDelete[] = namenode.blockReport(localName, data.getBlockReport());
+ data.invalidate(toDelete);
lastBlockReport = now;
}
if (receivedBlockList.size() > 0) {
@@ -182,7 +185,7 @@
// namenode that this datanode should perform.
//
BlockCommand cmd = namenode.getBlockwork(localName, xmitsInProgress);
- if (cmd.transferBlocks()) {
+ if (cmd != null && cmd.transferBlocks()) {
//
// Send a copy of a block to another datanode
//
@@ -202,14 +205,14 @@
}
}
}
- } else if (cmd.invalidateBlocks()) {
- //
- // Some local block(s) are obsolete and can be
- // safely garbage-collected.
- //
- data.invalidate(cmd.getBlocks());
- }
- }
+ } else if (cmd != null && cmd.invalidateBlocks()) {
+ //
+ // Some local block(s) are obsolete and can be
+ // safely garbage-collected.
+ //
+ data.invalidate(cmd.getBlocks());
+ }
+ }
//
// There is no work to do; sleep until hearbeat timer elapses,
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeInfo.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeInfo.java?rev=240336&r1=240335&r2=240336&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeInfo.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeInfo.java Fri Aug 26 13:26:38 2005
@@ -28,7 +28,7 @@
**************************************************/
public class DatanodeInfo implements Writable, Comparable {
UTF8 name;
- long capacity, remaining, lastUpdate, lastObsoleteCheck;
+ long capacity, remaining, lastUpdate;
volatile TreeSet blocks;
/**
@@ -41,7 +41,6 @@
this.name = name;
int colon = name.toString().indexOf(":");
this.blocks = new TreeSet();
- this.lastObsoleteCheck = System.currentTimeMillis();
updateHeartbeat(0, 0);
}
@@ -50,7 +49,6 @@
public DatanodeInfo(UTF8 name, long capacity, long remaining) {
this.name = name;
this.blocks = new TreeSet();
- this.lastObsoleteCheck = System.currentTimeMillis();
updateHeartbeat(capacity, remaining);
}
@@ -105,12 +103,6 @@
}
public long lastUpdate() {
return lastUpdate;
- }
- public void updateObsoleteCheck() {
- this.lastObsoleteCheck = System.currentTimeMillis();
- }
- public long lastObsoleteCheck() {
- return lastObsoleteCheck;
}
/////////////////////////////////////////////////
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java?rev=240336&r1=240335&r2=240336&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DatanodeProtocol.java Fri Aug 26 13:26:38 2005
@@ -28,7 +28,7 @@
public interface DatanodeProtocol {
public void sendHeartbeat(String sender, long capacity, long remaining) throws IOException;
- public void blockReport(String sender, Block blocks[]) throws IOException;
+ public Block[] blockReport(String sender, Block blocks[]) throws IOException;
public void blockReceived(String sender, Block blocks[]) throws IOException;
public void errorReport(String sender, String msg) throws IOException;
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java?rev=240336&r1=240335&r2=240336&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java Fri Aug 26 13:26:38 2005
@@ -100,8 +100,7 @@
//
public static long HEARTBEAT_INTERVAL = 3 * 1000;
public static long EXPIRE_INTERVAL = 60 * 1000;
- public static long BLOCKREPORT_INTERVAL = 9 * 60 * 1000;
- public static long OBSOLETE_INTERVAL = 10 * 60 * 1000;
+ public static long BLOCKREPORT_INTERVAL = 10 * 60 * 1000;
public static long DATANODE_STARTUP_PERIOD = 120 * 1000;
public static long LEASE_PERIOD = 16 * 1000;
public static int READ_TIMEOUT = 20 * 1000;
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=240336&r1=240335&r2=240336&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java Fri Aug 26 13:26:38 2005
@@ -38,6 +38,9 @@
final static int DESIRED_REPLICATION =
NutchConf.get().getInt("ndfs.replication", 3);
+ // The maximum number of replicates we should allow for a single block
+ final static int MAX_REPLICATION = DESIRED_REPLICATION;
+
// How many outgoing replication streams a given node should have at one time
final static int MAX_REPLICATION_STREAMS = NutchConf.get().getInt("ndfs.max-repl-streams", 2);
@@ -76,6 +79,13 @@
TreeMap recentInvalidateSets = new TreeMap();
//
+ // Keeps a TreeSet for every named node. Each treeset contains
+ // a list of the blocks that are "extra" at that location. We'll
+ // eventually remove these extras.
+ //
+ TreeMap excessReplicateMap = new TreeMap();
+
+ //
// Keeps track of files that are being created, plus the
// blocks that make them up.
//
@@ -825,7 +835,7 @@
* The given node is reporting all its blocks. Use this info to
* update the (machine-->blocklist) and (block-->machinelist) tables.
*/
- public synchronized void processReport(Block newReport[], UTF8 name) {
+ public synchronized Block[] processReport(Block newReport[], UTF8 name) {
DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);
if (node == null) {
throw new IllegalArgumentException("Unexpected exception. Received block report from node " + name + ", but there is no info for " + name);
@@ -869,6 +879,29 @@
// Modify node so it has the new blockreport
//
node.updateBlocks(newReport);
+
+ //
+ // We've now completely updated the node's block report profile.
+ // We now go through all its blocks and find which ones are invalid,
+ // no longer pending, or over-replicated.
+ //
+ // (Note it's not enough to just invalidate blocks at lease expiry
+ // time; datanodes can go down before the client's lease on
+ // the failed file expires and miss the "expire" event.)
+ //
+ // This function considers every block on a datanode, and thus
+ // should only be invoked infrequently.
+ //
+ Vector obsolete = new Vector();
+ for (Iterator it = node.getBlockIterator(); it.hasNext(); ) {
+ Block b = (Block) it.next();
+
+ if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
+ LOG.info("Obsoleting block " + b);
+ obsolete.add(b);
+ }
+ }
+ return (Block[]) obsolete.toArray(new Block[obsolete.size()]);
}
/**
@@ -889,7 +922,6 @@
synchronized (neededReplications) {
if (dir.isValidBlock(block)) {
- //LOG.info("Node " + node + " is reporting stored block " + block);
if (containingNodes.size() >= DESIRED_REPLICATION) {
neededReplications.remove(block);
pendingReplications.remove(block);
@@ -898,11 +930,68 @@
neededReplications.add(block);
}
}
+
+ //
+ // Find how many of the containing nodes are "extra", if any.
+ // If there are any extras, call chooseExcessReplicates() to
+ // mark them in the excessReplicateMap.
+ //
+ Vector nonExcess = new Vector();
+ for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {
+ DatanodeInfo cur = (DatanodeInfo) it.next();
+ TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());
+ if (excessBlocks == null || ! excessBlocks.contains(block)) {
+ nonExcess.add(cur);
+ }
+ }
+ if (nonExcess.size() > MAX_REPLICATION) {
+ chooseExcessReplicates(nonExcess, block, MAX_REPLICATION);
+ }
}
}
}
/**
+ * We want a max of "maxReps" replicates for any block, but we now have too many.
+ * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
+ *
+ * srcNodes.size() - dstNodes.size() == maxReps
+ *
+ * For now, we choose nodes randomly. In the future, we might enforce some
+ * kind of policy (like making sure replicates are spread across racks).
+ */
+ void chooseExcessReplicates(Vector nonExcess, Block b, int maxReps) {
+ while (nonExcess.size() - maxReps > 0) {
+ int chosenNode = r.nextInt(nonExcess.size());
+ DatanodeInfo cur = (DatanodeInfo) nonExcess.elementAt(chosenNode);
+ nonExcess.removeElementAt(chosenNode);
+
+ TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());
+ if (excessBlocks == null) {
+ excessBlocks = new TreeSet();
+ excessReplicateMap.put(cur.getName(), excessBlocks);
+ }
+ excessBlocks.add(b);
+
+ //
+ // The 'excessblocks' tracks blocks until we get confirmation
+ // that the datanode has deleted them; the only way we remove them
+ // is when we get a "removeBlock" message.
+ //
+ // The 'invalidate' list is used to inform the datanode the block
+ // should be deleted. Items are removed from the invalidate list
+ // upon giving instructions to the namenode.
+ //
+ Vector invalidateSet = (Vector) recentInvalidateSets.get(cur.getName());
+ if (invalidateSet == null) {
+ invalidateSet = new Vector();
+ recentInvalidateSets.put(cur.getName(), invalidateSet);
+ }
+ invalidateSet.add(b);
+ }
+ }
+
+ /**
* Modify (block-->datanode) map. Possibly generate
* replication tasks, if the removed block is still valid.
*/
@@ -924,6 +1013,18 @@
neededReplications.add(block);
}
}
+
+ //
+ // We've removed a block from a node, so it's definitely no longer
+ // in "excess" there.
+ //
+ TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(node.getName());
+ if (excessBlocks != null) {
+ excessBlocks.remove(block);
+ if (excessBlocks.size() == 0) {
+ excessReplicateMap.remove(node.getName());
+ }
+ }
}
/**
@@ -984,49 +1085,14 @@
/////////////////////////////////////////////////////////
/**
- * Return with a list of Blocks that should be invalidated
- * at the given node. Done in response to a file delete, which
- * eliminates a number of blocks from the universe.
- */
- public synchronized Block[] recentlyInvalidBlocks(UTF8 name) {
- Vector invalidateSet = (Vector) recentInvalidateSets.remove(name);
- if (invalidateSet == null) {
- return null;
- } else {
- return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]);
- }
- }
-
- /**
- * If the node has not been checked in some time, go through
- * its blocks and find which ones are neither valid nor pending.
- * It often happens that a client will start writing blocks and
- * then exit. The blocks are on-disk, but the file will be
- * abandoned.
- *
- * It's not enough to invalidate blocks at lease expiry time;
- * datanodes can go down before the client's lease on
- * the failed file expires and miss the "expire" event.
- *
- * This function considers every block on a datanode, and thus
- * should only be invoked infrequently.
+ * Check if there are any recently-deleted blocks a datanode should remove.
*/
- public synchronized Block[] checkObsoleteBlocks(UTF8 name) {
- DatanodeInfo nodeInfo = (DatanodeInfo) datanodeMap.get(name);
- if (System.currentTimeMillis() - nodeInfo.lastObsoleteCheck() <= OBSOLETE_INTERVAL) {
- return null;
+ public synchronized Block[] blocksToInvalidate(UTF8 sender) {
+ Vector invalidateSet = (Vector) recentInvalidateSets.remove(sender);
+ if (invalidateSet != null) {
+ return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]);
} else {
- nodeInfo.updateObsoleteCheck();
- Vector obsolete = new Vector();
- for (Iterator it = nodeInfo.getBlockIterator(); it.hasNext(); ) {
- Block b = (Block) it.next();
-
- if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
- LOG.info("Obsoleting block " + b);
- obsolete.add(b);
- }
- }
- return (Block[]) obsolete.toArray(new Block[obsolete.size()]);
+ return null;
}
}
@@ -1066,7 +1132,6 @@
it.remove();
} else {
TreeSet containingNodes = (TreeSet) blocksMap.get(block);
-
if (containingNodes.contains(srcNode)) {
DatanodeInfo targets[] = chooseTargets(Math.min(DESIRED_REPLICATION - containingNodes.size(), MAX_REPLICATION_STREAMS - xmitsInProgress), containingNodes);
if (targets.length > 0) {
@@ -1166,13 +1231,13 @@
if (forbidden1 != null) {
for (Iterator it = forbidden1.iterator(); it.hasNext(); ) {
DatanodeInfo cur = (DatanodeInfo) it.next();
- forbiddenMachines.add(cur.getHost());
+ forbiddenMachines.add(cur.getName());
}
}
if (forbidden2 != null) {
for (Iterator it = forbidden2.iterator(); it.hasNext(); ) {
DatanodeInfo cur = (DatanodeInfo) it.next();
- forbiddenMachines.add(cur.getHost());
+ forbiddenMachines.add(cur.getName());
}
}
@@ -1185,7 +1250,7 @@
DatanodeInfo node = (DatanodeInfo) it.next();
if ((forbidden1 == null || ! forbidden1.contains(node)) &&
(forbidden2 == null || ! forbidden2.contains(node)) &&
- (! forbiddenMachines.contains(node.getHost()))) {
+ (! forbiddenMachines.contains(node.getName()))) {
targetList.add(node);
totalRemaining += node.getRemaining();
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java?rev=240336&r1=240335&r2=240336&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Fri Aug 26 13:26:38 2005
@@ -262,9 +262,9 @@
namesystem.gotHeartbeat(new UTF8(sender), capacity, remaining);
}
- public void blockReport(String sender, Block blocks[]) {
+ public Block[] blockReport(String sender, Block blocks[]) {
LOG.info("Block report from "+sender+": "+blocks.length+" blocks.");
- namesystem.processReport(blocks, new UTF8(sender));
+ return namesystem.processReport(blocks, new UTF8(sender));
}
public void blockReceived(String sender, Block blocks[]) {
@@ -281,7 +281,8 @@
}
/**
- * Return a block-oriented command for the datanode to execute
+ * Return a block-oriented command for the datanode to execute.
+ * This will be either a transfer or a delete operation.
*/
public BlockCommand getBlockwork(String sender, int xmitsInProgress) {
//
@@ -293,17 +294,16 @@
}
//
- // If none, check to see if there are blocks to invalidate
+ // If there are no transfers, check for recently-deleted blocks that
+ // should be removed. This is not a full-datanode sweep, as is done during
+ // a block report. This is just a small fast removal of blocks that have
+ // just been removed.
//
- Block blocks[] = namesystem.recentlyInvalidBlocks(new UTF8(sender));
- if (blocks == null) {
- blocks = namesystem.checkObsoleteBlocks(new UTF8(sender));
- }
+ Block blocks[] = namesystem.blocksToInvalidate(new UTF8(sender));
if (blocks != null) {
return new BlockCommand(blocks);
}
-
- return new BlockCommand();
+ return null;
}
/**