You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/26 22:22:37 UTC
svn commit: r468115 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/dfs/
Author: cutting
Date: Thu Oct 26 13:22:36 2006
New Revision: 468115
URL: http://svn.apache.org/viewvc?view=rev&rev=468115
Log:
HADOOP-641. Change NameNoide to request a fresh block report from re-discovered DataNodes. Contributed by Konstantin.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=468115&r1=468114&r2=468115
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Oct 26 13:22:36 2006
@@ -62,6 +62,10 @@
as the source for file copy and move commands.
(Dhruba Borthakur via cutting)
+17. HADOOP-641. Change NameNode to request a fresh block report from
+ a re-discovered DataNode, so that no-longer-needed replications
+ are stopped promptly. (Konstantin Shvachko via cutting)
+
Release 0.7.2 - 2006-10-18
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java?view=diff&rev=468115&r1=468114&r2=468115
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java Thu Oct 26 13:22:36 2006
@@ -37,54 +37,43 @@
public Writable newInstance() { return new BlockCommand(); }
});
}
-
- boolean transferBlocks = false;
- boolean invalidateBlocks = false;
- boolean shutdown = false;
+
+ DatanodeProtocol.DataNodeAction action;
Block blocks[];
DatanodeInfo targets[][];
public BlockCommand() {
- this.transferBlocks = false;
- this.invalidateBlocks = false;
- this.shutdown = false;
- this.blocks = new Block[0];
- this.targets = new DatanodeInfo[0][];
+ this.action = DatanodeProtocol.DataNodeAction.DNA_UNKNOWN;
+ this.blocks = new Block[0];
+ this.targets = new DatanodeInfo[0][];
}
+ /**
+ * Create BlockCommand for transferring blocks to another datanode
+ * @param blocks blocks to be transferred
+ * @param targets nodes to transfer
+ */
public BlockCommand(Block blocks[], DatanodeInfo targets[][]) {
- this.transferBlocks = true;
- this.invalidateBlocks = false;
- this.shutdown = false;
- this.blocks = blocks;
- this.targets = targets;
+ this.action = DatanodeProtocol.DataNodeAction.DNA_TRANSFER;
+ this.blocks = blocks;
+ this.targets = targets;
}
+ /**
+ * Create BlockCommand for block invalidation
+ * @param blocks blocks to invalidate
+ */
public BlockCommand(Block blocks[]) {
- this.transferBlocks = false;
- this.invalidateBlocks = true;
- this.shutdown = false;
- this.blocks = blocks;
- this.targets = new DatanodeInfo[0][];
+ this.action = DatanodeProtocol.DataNodeAction.DNA_INVALIDATE;
+ this.blocks = blocks;
+ this.targets = new DatanodeInfo[0][];
}
- public BlockCommand( boolean doShutdown ) {
+ public BlockCommand( DatanodeProtocol.DataNodeAction action ) {
this();
- this.shutdown = doShutdown;
- }
-
- public boolean transferBlocks() {
- return transferBlocks;
+ this.action = action;
}
- public boolean invalidateBlocks() {
- return invalidateBlocks;
- }
-
- public boolean shutdownNode() {
- return shutdown;
- }
-
public Block[] getBlocks() {
return blocks;
}
@@ -97,8 +86,7 @@
// Writable
///////////////////////////////////////////
public void write(DataOutput out) throws IOException {
- out.writeBoolean(transferBlocks);
- out.writeBoolean(invalidateBlocks);
+ WritableUtils.writeEnum( out, action );
out.writeInt(blocks.length);
for (int i = 0; i < blocks.length; i++) {
blocks[i].write(out);
@@ -113,8 +101,8 @@
}
public void readFields(DataInput in) throws IOException {
- this.transferBlocks = in.readBoolean();
- this.invalidateBlocks = in.readBoolean();
+ this.action = (DatanodeProtocol.DataNodeAction)
+ WritableUtils.readEnum( in, DatanodeProtocol.DataNodeAction.class );
this.blocks = new Block[in.readInt()];
for (int i = 0; i < blocks.length; i++) {
blocks[i] = new Block();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=468115&r1=468114&r2=468115
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Oct 26 13:22:36 2006
@@ -267,7 +267,17 @@
* @throws IOException
*/
private void register() throws IOException {
- dnRegistration = namenode.register( dnRegistration );
+ while( true ) {
+ try {
+ dnRegistration = namenode.register( dnRegistration );
+ break;
+ } catch( SocketTimeoutException e ) { // namenode is busy
+ LOG.info("Problem connecting to server: " + getNameNodeAddr());
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {}
+ }
+ }
if( storage.getStorageID().equals("") ) {
storage.setStorageID( dnRegistration.getStorageID());
storage.writeAll();
@@ -350,29 +360,14 @@
if( cmd != null ) {
data.checkDataDir();
- if (cmd.transferBlocks()) {
+ switch( cmd.action ) {
+ case DNA_TRANSFER:
//
// Send a copy of a block to another datanode
//
- Block blocks[] = cmd.getBlocks();
- DatanodeInfo xferTargets[][] = cmd.getTargets();
-
- for (int i = 0; i < blocks.length; i++) {
- if (!data.isValidBlock(blocks[i])) {
- String errStr = "Can't send invalid block " + blocks[i];
- LOG.info(errStr);
- namenode.errorReport( dnRegistration,
- DatanodeProtocol.INVALID_BLOCK,
- errStr);
- break;
- } else {
- if (xferTargets[i].length > 0) {
- LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
- new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
- }
- }
- }
- } else if (cmd.invalidateBlocks()) {
+ transferBlocks( cmd.getBlocks(), cmd.getTargets() );
+ break;
+ case DNA_INVALIDATE:
//
// Some local block(s) are obsolete and can be
// safely garbage-collected.
@@ -380,10 +375,17 @@
Block toDelete[] = cmd.getBlocks();
data.invalidate(toDelete);
myMetrics.removedBlocks(toDelete.length);
- } else if( cmd.shutdownNode()) {
+ break;
+ case DNA_SHUTDOWN:
// shut down the data node
this.shutdown();
continue;
+ case DNA_REPORT:
+ // namenode requested a block report; sending
+ lastBlockReport = 0;
+ break;
+ default:
+ LOG.warn( "Unknown BlockCommand action: " + cmd.action);
}
}
}
@@ -455,7 +457,24 @@
} // while (shouldRun)
} // offerService
-
+ private void transferBlocks( Block blocks[],
+ DatanodeInfo xferTargets[][]
+ ) throws IOException {
+ for (int i = 0; i < blocks.length; i++) {
+ if (!data.isValidBlock(blocks[i])) {
+ String errStr = "Can't send invalid block " + blocks[i];
+ LOG.info(errStr);
+ namenode.errorReport( dnRegistration,
+ DatanodeProtocol.INVALID_BLOCK,
+ errStr );
+ break;
+ }
+ if (xferTargets[i].length > 0) {
+ LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
+ new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
+ }
+ }
+ }
/**
* Server used for receiving/sending a block of data.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?view=diff&rev=468115&r1=468114&r2=468115
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Thu Oct 26 13:22:36 2006
@@ -29,12 +29,23 @@
* @author Michael Cafarella
**********************************************************************/
interface DatanodeProtocol extends VersionedProtocol {
- public static final long versionID = 2L; // infoPort added to DatanodeID
- // affected: DatanodeRegistration
+ public static final long versionID = 3L; // BlockCommand.action replaced boolean members
+ // affected: BlockCommand
// error code
final static int DISK_ERROR = 1;
final static int INVALID_BLOCK = 2;
+
+ /**
+ * Determines actions that data node should perform
+ * when receiving a block command.
+ */
+ public enum DataNodeAction{ DNA_UNKNOWN, // unknown action
+ DNA_TRANSFER, // transfer blocks to another datanode
+ DNA_INVALIDATE, // invalidate blocks
+ DNA_SHUTDOWN, // shutdown node
+ DNA_REPORT; } // send block report to the namenode
+
/**
* Register Datanode.
*
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=468115&r1=468114&r2=468115
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Oct 26 13:22:36 2006
@@ -1297,14 +1297,23 @@
* The given node has reported in. This method should:
* 1) Record the heartbeat, so the datanode isn't timed out
* 2) Adjust usage stats for future block allocation
+ *
+ * If a substantial amount of time passed since the last datanode
+ * heartbeat then request an immediate block report.
+ *
+ * @return true if block report is required or false otherwise.
+ * @throws IOException
*/
- public synchronized void gotHeartbeat(DatanodeID nodeID,
- long capacity,
- long remaining,
- int xceiverCount) throws IOException {
+ public synchronized boolean gotHeartbeat( DatanodeID nodeID,
+ long capacity,
+ long remaining,
+ int xceiverCount
+ ) throws IOException {
+ boolean needBlockReport;
synchronized (heartbeats) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = getDatanode( nodeID );
+ needBlockReport = nodeinfo.isDead();
if (nodeinfo == null)
// We do not accept unregistered guests
@@ -1314,6 +1323,7 @@
addHeartbeat(nodeinfo);
}
}
+ return needBlockReport;
}
/**
@@ -1902,6 +1912,9 @@
"BLOCK* NameSystem.pendingTransfer: " + "ask "
+ srcNode.getName() + " to replicate "
+ block.getBlockName() + " to " + targetList);
+ NameNode.stateChangeLog.debug(
+ "BLOCK* neededReplications = " + neededReplications.size()
+ + " pendingReplications = " + pendingReplications.size() );
}
}
@@ -2280,7 +2293,7 @@
*/
synchronized boolean isOn() {
try {
- isConsistent(); // SHV this an assert
+ isConsistent(); // SHV this is an assert
} catch( IOException e ) {
System.err.print( StringUtils.stringifyException( e ));
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=468115&r1=468114&r2=468115
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Thu Oct 26 13:22:36 2006
@@ -464,7 +464,10 @@
int xmitsInProgress,
int xceiverCount) throws IOException {
verifyRequest( nodeReg );
- namesystem.gotHeartbeat( nodeReg, capacity, remaining, xceiverCount );
+ if( namesystem.gotHeartbeat( nodeReg, capacity, remaining, xceiverCount )) {
+ // request block report from the datanode
+ return new BlockCommand( DataNodeAction.DNA_REPORT );
+ }
//
// Ask to perform pending transfers, if any