You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/04 20:14:42 UTC
svn commit: r492695 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
src/webapps/dfs/ src/webapps/static/
Author: cutting
Date: Thu Jan 4 11:14:39 2007
New Revision: 492695
URL: http://svn.apache.org/viewvc?view=rev&rev=492695
Log:
HADOOP-681. Add to HDFS the ability to decommission nodes. Contributed by Dhruba.
Added:
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp
lucene/hadoop/trunk/src/webapps/static/hadoop.css
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jan 4 11:14:39 2007
@@ -188,6 +188,10 @@
53. HADOOP-840. In task tracker, queue task cleanups and perform them
in a separate thread. (omalley & Mahadev Konar via cutting)
+54. HADOOP-681. Add to HDFS the ability to decommission nodes. This
+ causes their blocks to be re-replicated on other nodes, so that
+ they may be removed from a cluster. (Dhruba Borthakur via cutting)
+
Release 0.9.2 - 2006-12-15
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Thu Jan 4 11:14:39 2007
@@ -29,7 +29,7 @@
**********************************************************************/
interface ClientProtocol extends VersionedProtocol {
- public static final long versionID = 3L; // setSafeMode() added
+ public static final long versionID = 4L; // decommission node added
///////////////////////////////////////
// File contents
@@ -300,4 +300,6 @@
* @author Konstantin Shvachko
*/
public boolean setSafeMode( FSConstants.SafeModeAction action ) throws IOException;
+
+ public boolean decommission( FSConstants.DecommissionAction action, String[] nodenames) throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java Thu Jan 4 11:14:39 2007
@@ -86,8 +86,6 @@
* @exception IOException if the filesystem does not exist.
*/
public void setSafeMode(String[] argv, int idx) throws IOException {
- final String safeModeUsage = "Usage: java DFSAdmin -safemode "
- + "[enter | leave | get]";
if (!(fs instanceof DistributedFileSystem)) {
System.out.println("FileSystem is " + fs.getName());
return;
@@ -134,6 +132,65 @@
}
/**
+ * Command related to decommission of a datanode.
+ * Usage: java DFSAdmin -decommission [enter | leave | get]
+ * @param argv List of command line parameters. Each of these items
+ could be a hostname or a hostname:portname.
+ * @param idx The index of the command that is being processed.
+ * @exception IOException if the filesystem does not exist.
+ * @return 0 on success, non zero on error.
+ */
+ public int decommission(String[] argv, int idx) throws IOException {
+ int exitCode = -1;
+
+ if (!(fs instanceof DistributedFileSystem)) {
+ System.out.println("FileSystem is " + fs.getName());
+ return exitCode;
+ }
+ if (idx >= argv.length - 1) {
+ printUsage("-decommission");
+ return exitCode;
+ }
+
+ //
+ // Copy all the datanode names to nodes[]
+ //
+ String[] nodes = new String[argv.length - idx - 1];
+ for (int i = idx + 1, j = 0; i < argv.length; i++, j++) {
+ nodes[j] = argv[i];
+ }
+
+ FSConstants.DecommissionAction action;
+
+ if ("set".equalsIgnoreCase(argv[idx])) {
+ action = FSConstants.DecommissionAction.DECOMMISSION_SET;
+ } else if ("clear".equalsIgnoreCase(argv[idx])) {
+ action = FSConstants.DecommissionAction.DECOMMISSION_CLEAR;
+ } else if ("get".equalsIgnoreCase(argv[idx])) {
+ action = FSConstants.DecommissionAction.DECOMMISSION_GET;
+ } else {
+ printUsage("-decommission");
+ return exitCode;
+ }
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ boolean mode = dfs.decommission(action, nodes);
+
+ if (action == FSConstants.DecommissionAction.DECOMMISSION_GET) {
+ if (mode) {
+ System.out.println("Node(s) has finished decommission");
+ }
+ else {
+ System.out.println("Node(s) have not yet been decommissioned");
+ }
+ return 0;
+ }
+ if (mode) {
+ return 0; // success
+ }
+ return exitCode;
+ }
+
+ /**
* Displays format of commands.
* @param cmd The command that is being executed.
*/
@@ -144,10 +201,15 @@
} else if ("-safemode".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-safemode enter | leave | get | wait]");
+ } else if ("-decommission".equals(cmd)) {
+ System.err.println("Usage: java DFSAdmin"
+ + " [-decommission set | clear | get "
+ + "[datanode1[, datanode2..]]");
} else {
System.err.println("Usage: java DFSAdmin");
System.err.println(" [-report]");
System.err.println(" [-safemode enter | leave | get | wait]");
+ System.err.println(" [-decommission set | clear | get]");
}
}
@@ -180,6 +242,11 @@
printUsage(cmd);
return exitCode;
}
+ } else if ("-decommission".equals(cmd)) {
+ if (argv.length < 2) {
+ printUsage(cmd);
+ return exitCode;
+ }
}
// initialize DFSAdmin
@@ -200,6 +267,8 @@
report();
} else if ("-safemode".equals(cmd)) {
setSafeMode(argv, i);
+ } else if ("-decommission".equals(cmd)) {
+ exitCode = decommission(argv, i);
} else {
exitCode = -1;
System.err.println(cmd.substring(1) + ": Unknown command");
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Jan 4 11:14:39 2007
@@ -362,6 +362,18 @@
}
/**
+ * Set, clear decommission state of datnode(s).
+ * See {@link ClientProtocol#decommission(FSConstants.DecommissionAction)}
+ * for more details.
+ *
+ * @see ClientProtocol#decommission(FSConstants.DecommissionAction)
+ */
+ public boolean decommission(DecommissionAction action, String[] nodes)
+ throws IOException {
+ return namenode.decommission(action, nodes);
+ }
+
+ /**
*/
public boolean mkdirs(UTF8 src) throws IOException {
checkOpen();
@@ -526,6 +538,14 @@
}
this.blocks = newBlocks;
this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]);
+ }
+
+ /**
+ * Used by the automatic tests to detemine blocks locations of a
+ * file
+ */
+ synchronized DatanodeInfo[][] getDataNodes() {
+ return nodes;
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java Thu Jan 4 11:14:39 2007
@@ -27,6 +27,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.io.WritableUtils;
/**
* DatanodeInfo represents the status of a DataNode.
@@ -42,8 +43,14 @@
protected long lastUpdate;
protected int xceiverCount;
+ // administrative states of a datanode
+ public enum AdminStates {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
+ protected AdminStates adminState;
+
+
DatanodeInfo() {
super();
+ adminState = null;
}
DatanodeInfo( DatanodeInfo from ) {
@@ -52,6 +59,7 @@
this.remaining = from.getRemaining();
this.lastUpdate = from.getLastUpdate();
this.xceiverCount = from.getXceiverCount();
+ this.adminState = from.adminState;
}
DatanodeInfo( DatanodeID nodeID ) {
@@ -60,6 +68,7 @@
this.remaining = 0L;
this.lastUpdate = 0L;
this.xceiverCount = 0;
+ this.adminState = null;
}
/** The raw capacity. */
@@ -101,6 +110,13 @@
long r = getRemaining();
long u = c - r;
buffer.append("Name: "+name+"\n");
+ if (isDecommissioned()) {
+ buffer.append("State : Decommissioned\n");
+ } else if (isDecommissionInProgress()) {
+ buffer.append("State : Decommission in progress\n");
+ } else {
+ buffer.append("State : In Service\n");
+ }
buffer.append("Total raw bytes: "+c+" ("+FsShell.byteDesc(c)+")"+"\n");
buffer.append("Used raw bytes: "+u+" ("+FsShell.byteDesc(u)+")"+"\n");
buffer.append("% used: "+FsShell.limitDecimal(((1.0*u)/c)*100,2)+"%"+"\n");
@@ -108,6 +124,72 @@
return buffer.toString();
}
+ /**
+ * Start decommissioning a node.
+ * old state.
+ */
+ void startDecommission() {
+ adminState = AdminStates.DECOMMISSION_INPROGRESS;
+ }
+
+ /**
+ * Stop decommissioning a node.
+ * old state.
+ */
+ void stopDecommission() {
+ adminState = null;
+ }
+
+ /**
+ * Returns true if the node is in the process of being decommissioned
+ */
+ boolean isDecommissionInProgress() {
+ if (adminState == AdminStates.DECOMMISSION_INPROGRESS) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Returns true if the node has been decommissioned.
+ */
+ boolean isDecommissioned() {
+ if (adminState == AdminStates.DECOMMISSIONED) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Sets the admin state to indicate that decommision is complete.
+ */
+ void setDecommissioned() {
+ assert isDecommissionInProgress();
+ adminState = AdminStates.DECOMMISSIONED;
+ }
+
+ /**
+ * Retrieves the admin state of this node.
+ */
+ AdminStates getAdminState() {
+ if (adminState == null) {
+ return AdminStates.NORMAL;
+ }
+ return adminState;
+ }
+
+ /**
+ * Sets the admin state of this node.
+ */
+ void setAdminState(AdminStates newState) {
+ if (newState == AdminStates.NORMAL) {
+ adminState = null;
+ }
+ else {
+ adminState = newState;
+ }
+ }
+
/////////////////////////////////////////////////
// Writable
/////////////////////////////////////////////////
@@ -127,6 +209,7 @@
out.writeLong(remaining);
out.writeLong(lastUpdate);
out.writeInt(xceiverCount);
+ WritableUtils.writeEnum(out, getAdminState());
}
/**
@@ -137,5 +220,8 @@
this.remaining = in.readLong();
this.lastUpdate = in.readLong();
this.xceiverCount = in.readInt();
+ AdminStates newState = (AdminStates) WritableUtils.readEnum(in,
+ AdminStates.class);
+ setAdminState(newState);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Thu Jan 4 11:14:39 2007
@@ -281,7 +281,7 @@
return used;
}
- /** Return statistics for each datanode.*/
+ /** Return statistics for each datanode. */
public DatanodeInfo[] getDataNodeStats() throws IOException {
return dfs.datanodeReport();
}
@@ -294,5 +294,14 @@
public boolean setSafeMode( FSConstants.SafeModeAction action )
throws IOException {
return dfs.setSafeMode( action );
+ }
+
+ /**
+ * Set, clear decommission of a set of datanodes.
+ */
+ public boolean decommission(FSConstants.DecommissionAction action,
+ String[] nodes)
+ throws IOException {
+ return dfs.decommission(action, nodes);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Thu Jan 4 11:14:39 2007
@@ -122,6 +122,9 @@
// SafeMode actions
public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
+ // decommission administrative actions
+ public enum DecommissionAction{ DECOMMISSION_SET, DECOMMISSION_CLEAR, DECOMMISSION_GET; }
+
// Version is reflected in the dfs image and edit log files.
// Version is reflected in the data storage file.
// Versions are negative.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Thu Jan 4 11:14:39 2007
@@ -258,8 +258,9 @@
if( logVersion > -3 )
throw new IOException("Unexpected opcode " + opcode
+ " for version " + logVersion );
- DatanodeDescriptor node = new DatanodeDescriptor();
- node.readFields(in);
+ FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
+ nodeimage.readFields(in);
+ DatanodeDescriptor node = nodeimage.getDatanodeDescriptor();
fsNamesys.unprotectedAddDatanode( node );
break;
}
@@ -376,7 +377,7 @@
* registration event.
*/
void logAddDatanode( DatanodeDescriptor node ) {
- logEdit( OP_DATANODE_ADD, node, null );
+ logEdit( OP_DATANODE_ADD, new FSImage.DatanodeImage(node), null );
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Thu Jan 4 11:14:39 2007
@@ -74,7 +74,7 @@
}
this.editLog = new FSEditLog( edits );
}
-
+
FSEditLog getEditLog() {
return editLog;
}
@@ -344,7 +344,7 @@
}
}
- class DatanodeImage implements WritableComparable {
+ static class DatanodeImage implements WritableComparable {
/**************************************************
* DatanodeImage is used to store persistent information
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Jan 4 11:14:39 2007
@@ -717,7 +717,10 @@
// the blocks.
for (int i = 0; i < nrBlocks; i++) {
SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
- if (containingNodes.size() < pendingFile.getReplication()) {
+ // filter out containingNodes that are marked for decommission.
+ int numCurrentReplica = countContainingNodes(containingNodes);
+
+ if (numCurrentReplica < pendingFile.getReplication()) {
NameNode.stateChangeLog.debug(
"DIR* NameSystem.completeFile:"
+ pendingBlocks[i].getBlockName()+" has only "+containingNodes.size()
@@ -1585,20 +1588,25 @@
FSDirectory.INode fileINode = dir.getFileByBlock(block);
if( fileINode == null ) // block does not belong to any file
return;
+
+ // filter out containingNodes that are marked for decommission.
+ int numCurrentReplica = countContainingNodes(containingNodes);
+
// check whether safe replication is reached for the block
// only if it is a part of a files
- incrementSafeBlockCount( containingNodes.size() );
+ incrementSafeBlockCount( numCurrentReplica );
short fileReplication = fileINode.getReplication();
- if (containingNodes.size() >= fileReplication ) {
+ if (numCurrentReplica >= fileReplication ) {
neededReplications.remove(block);
pendingReplications.remove(block);
NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: "
- +block.getBlockName()+" has "+containingNodes.size()
+ +block.getBlockName()+" has "+ numCurrentReplica
+" replicas so is removed from neededReplications and pendingReplications" );
- } else {// containingNodes.size() < fileReplication
+
+ } else {// numCurrentReplica < fileReplication
neededReplications.add(block);
NameNode.stateChangeLog.debug("BLOCK* NameSystem.addStoredBlock: "
- +block.getBlockName()+" has only "+containingNodes.size()
+ +block.getBlockName()+" has only "+ numCurrentReplica
+" replicas so is added to neededReplications" );
}
@@ -1620,7 +1628,9 @@
DatanodeDescriptor cur = it.next();
Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
if (excessBlocks == null || ! excessBlocks.contains(block)) {
+ if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
nonExcess.add(cur);
+ }
}
}
chooseExcessReplicates(nonExcess, block, replication);
@@ -1811,6 +1821,145 @@
}
}
}
+
+ /**
+ * Start decommissioning the specified datanodes. If a datanode is
+ * already being decommissioned, then this is a no-op.
+ */
+ public synchronized void startDecommission (String[] nodes)
+ throws IOException {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot decommission node ", safeMode);
+ }
+ boolean isError = false;
+ String badnodes = "";
+
+ synchronized (datanodeMap) {
+ for (int i = 0; i < nodes.length; i++) {
+ boolean found = false;
+ for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
+ it.hasNext(); ) {
+ DatanodeDescriptor node = it.next();
+
+ //
+ // If this is a node that we are interested in, set its admin state.
+ //
+ if (node.getName().equals(nodes[i]) ||
+ node.getHost().equals(nodes[i])) {
+ found = true;
+ if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+ LOG.info("Start Decommissioning node " + node.name);
+ node.startDecommission();
+ //
+ // all those blocks that resides on this node has to be
+ // replicated.
+ Block decommissionBlocks[] = node.getBlocks();
+ for (int j = 0; j < decommissionBlocks.length; j++) {
+ synchronized (neededReplications) {
+ neededReplications.add(decommissionBlocks[j]);
+ }
+ }
+ }
+ break;
+ }
+ }
+ //
+ // Record the fact that a specified node was not found
+ //
+ if (!found) {
+ badnodes += nodes[i] + " ";
+ isError = true;
+ }
+ }
+ }
+ if (isError) {
+ throw new IOException("Nodes " + badnodes + " not found");
+ }
+ }
+
+ /**
+ * Stop decommissioning the specified datanodes.
+ */
+ public synchronized void stopDecommission (String[] nodes)
+ throws IOException {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot decommission node ", safeMode);
+ }
+ boolean isError = false;
+ String badnodes = "";
+
+ synchronized (datanodeMap) {
+ for (int i = 0; i < nodes.length; i++) {
+ boolean found = false;
+ for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
+ it.hasNext(); ) {
+ DatanodeDescriptor node = it.next();
+
+ //
+ // If this is a node that we are interested in, set its admin state.
+ //
+ if (node.getName().equals(nodes[i]) ||
+ node.getHost().equals(nodes[i])) {
+ LOG.info("Stop Decommissioning node " + node.name);
+ found = true;
+ node.stopDecommission();
+ break;
+ }
+ }
+ //
+ // Record the fact that a specified node was not found
+ //
+ if (!found) {
+ badnodes += nodes[i] + " ";
+ isError = true;
+ }
+ }
+ }
+ if (isError) {
+ throw new IOException("Nodes " + badnodes + " not found");
+ }
+ }
+
+ /**
+ * Return true if all specified nodes are decommissioned.
+ * Otherwise return false.
+ */
+ public synchronized boolean checkDecommissioned (String[] nodes)
+ throws IOException {
+ String badnodes = "";
+ boolean isError = false;
+
+ synchronized (datanodeMap) {
+ for (int i = 0; i < nodes.length; i++) {
+ boolean found = false;
+ for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
+ it.hasNext(); ) {
+ DatanodeDescriptor node = it.next();
+
+ //
+ // If this is a node that we are interested in, check its admin state.
+ //
+ if (node.getName().equals(nodes[i]) ||
+ node.getHost().equals(nodes[i])) {
+ found = true;
+ boolean isDecommissioned = checkDecommissionStateInternal(node);
+ if (!isDecommissioned) {
+ return false;
+ }
+ }
+ }
+ if (!found) {
+ badnodes += nodes[i] + " ";
+ isError = true;
+ }
+ }
+ }
+ if (isError) {
+ throw new IOException("Nodes " + badnodes + " not found");
+ }
+ return true;
+ }
+
/**
*/
public DatanodeInfo getDataNodeInfo(String name) {
@@ -1896,6 +2045,72 @@
return (Block[]) sendBlock.toArray(new Block[sendBlock.size()]);
}
+ /*
+ * Counts the number of nodes in the given list. Skips over nodes
+ * that are marked for decommission.
+ */
+ private int countContainingNodes(Collection<DatanodeDescriptor> nodelist) {
+ int count = 0;
+ for (Iterator<DatanodeDescriptor> it = nodelist.iterator();
+ it.hasNext(); ) {
+ DatanodeDescriptor node = it.next();
+ if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /*
+ * Return true if there are any blocks in neededReplication that
+ * reside on the specified node. Otherwise returns false.
+ */
+ private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
+ synchronized (neededReplications) {
+ for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();){
+ Block block = it.next();
+ Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+ if (containingNodes.contains(srcNode)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Change, if appropriate, the admin state of a datanode to
+ * decommission completed. Return true if decommission is complete.
+ */
+ private boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
+ //
+ // Check to see if there are any blocks in the neededReplication
+ // data structure that has a replica on the node being decommissioned.
+ //
+ if (node.isDecommissionInProgress()) {
+ if (!isReplicationInProgress(node)) {
+ node.setDecommissioned();
+ LOG.info("Decommission complete for node " + node.name);
+ }
+ }
+ if (node.isDecommissioned()) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Change, if appropriate, the admin state of a datanode to
+ * decommission completed.
+ */
+ public synchronized void checkDecommissionState(DatanodeID nodeReg) {
+ DatanodeDescriptor node = datanodeMap.get(nodeReg.getStorageID());
+ if (node == null) {
+ return;
+ }
+ checkDecommissionStateInternal(node);
+ }
+
/**
* Return with a list of Block/DataNodeInfo sets, indicating
* where various Blocks should be copied, ASAP.
@@ -1924,6 +2139,7 @@
// replicate them.
//
List<Block> replicateBlocks = new ArrayList<Block>();
+ List<Integer> numCurrentReplicas = new ArrayList<Integer>();
List<DatanodeDescriptor[]> replicateTargetSets;
replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();) {
@@ -1943,17 +2159,23 @@
Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
Collection<Block> excessBlocks = excessReplicateMap.get(
srcNode.getStorageID() );
+
// srcNode must contain the block, and the block must
// not be scheduled for removal on that node
if (containingNodes != null && containingNodes.contains(srcNode)
&& (excessBlocks == null || ! excessBlocks.contains(block))) {
+
+ // filter out containingNodes that are marked for decommission.
+ int numCurrentReplica = countContainingNodes(containingNodes);
+
DatanodeDescriptor targets[] = chooseTargets(
- Math.min( fileINode.getReplication() - containingNodes.size(),
+ Math.min( fileINode.getReplication() - numCurrentReplica,
this.maxReplicationStreams - xmitsInProgress),
containingNodes, null, blockSize);
if (targets.length > 0) {
// Build items to return
replicateBlocks.add(block);
+ numCurrentReplicas.add(new Integer(numCurrentReplica));
replicateTargetSets.add(targets);
scheduledXfers += targets.length;
}
@@ -1973,9 +2195,10 @@
Block block = it.next();
DatanodeDescriptor targets[] =
(DatanodeDescriptor[]) replicateTargetSets.get(i);
+ int numCurrentReplica = numCurrentReplicas.get(i).intValue();
Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
- if (containingNodes.size() + targets.length >=
+ if (numCurrentReplica + targets.length >=
dir.getFileByBlock( block).getReplication() ) {
neededReplications.remove(block);
pendingReplications.add(block);
@@ -2060,7 +2283,8 @@
it.hasNext();) {
DatanodeDescriptor node = it.next();
if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
- clientMachine.toString().equals(node.getHost())) {
+ clientMachine.toString().equals(node.getHost()) &&
+ !node.isDecommissionInProgress() && !node.isDecommissioned()) {
if ((node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) &&
(node.getXceiverCount() <= (2.0 * avgLoad))) {
targets.add(node);
@@ -2084,6 +2308,7 @@
DatanodeDescriptor node = heartbeats.get(idx);
if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
!targets.contains(node) &&
+ !node.isDecommissionInProgress() && !node.isDecommissioned() &&
(node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) &&
(node.getXceiverCount() <= (2.0 * avgLoad))) {
target = node;
@@ -2100,6 +2325,7 @@
DatanodeDescriptor node = heartbeats.get(idx);
if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
!targets.contains(node) &&
+ !node.isDecommissionInProgress() && !node.isDecommissioned() &&
node.getRemaining() >= blockSize) {
target = node;
break;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Thu Jan 4 11:14:39 2007
@@ -465,6 +465,27 @@
boolean isInSafeMode() {
return namesystem.isInSafeMode();
}
+
+ /**
+ * Set administrative commands to decommission datanodes.
+ */
+ public boolean decommission(DecommissionAction action, String[] nodes)
+ throws IOException {
+ boolean ret = true;
+ switch (action) {
+ case DECOMMISSION_SET: // decommission datanode(s)
+ namesystem.startDecommission(nodes);
+ break;
+ case DECOMMISSION_CLEAR: // remove decommission state of a datanode
+ namesystem.stopDecommission(nodes);
+ break;
+ case DECOMMISSION_GET: // are all the node decommissioned?
+ ret = namesystem.checkDecommissioned(nodes);
+ break;
+ }
+ return ret;
+ }
+
////////////////////////////////////////////////////////////////
// DatanodeProtocol
@@ -513,6 +534,14 @@
if (blocks != null) {
return new BlockCommand(blocks);
}
+ //
+ // See if the decommissioned node has finished moving all
+ // its datablocks to another replica. This is a loose
+ // heuristic to determine when a decommission is really over.
+ // We can probbaly do it in a seperate thread rather than making
+ // the heartbeat thread do this.
+ //
+ namesystem.checkDecommissionState(nodeReg);
return null;
}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java?view=auto&rev=492695
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Thu Jan 4 11:14:39 2007
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.Random;
+import java.net.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests the decommissioning of nodes.
+ * @author Dhruba Borthakur
+ */
+public class TestDecommission extends TestCase {
+ static final long seed = 0xDEADBEEFL;
+ static final int blockSize = 8192;
+ static final int fileSize = 16384;
+ static final int numDatanodes = 4;
+
+ Random myrand = new Random();
+
+ private void writeFile(FileSystem fileSys, Path name, int repl)
+ throws IOException {
+ // create and write a file that contains three blocks of data
+ FSOutputStream stm = fileSys.createRaw(name, true, (short)repl,
+ (long)blockSize);
+ byte[] buffer = new byte[fileSize];
+ Random rand = new Random(seed);
+ rand.nextBytes(buffer);
+ stm.write(buffer);
+ stm.close();
+ }
+
+
+ private void checkFile(FileSystem fileSys, Path name, int repl)
+ throws IOException {
+ String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize);
+ for (int idx = 0; idx < locations.length; idx++) {
+ assertEquals("Number of replicas for block" + idx,
+ Math.min(numDatanodes, repl), locations[idx].length);
+ }
+ }
+
+ /**
+ * For blocks that reside on the nodes that are down, verify that their
+ * replication factor is 1 more than the specified one.
+ */
+ private void checkFile(FileSystem fileSys, Path name, int repl,
+ String[] downnodes) throws IOException {
+ FSInputStream is = fileSys.openRaw(name);
+ DFSClient.DFSInputStream dis = (DFSClient.DFSInputStream) is;
+ DatanodeInfo[][] dinfo = dis.getDataNodes();
+
+ for (int blk = 0; blk < dinfo.length; blk++) { // for each block
+ int hasdown = 0;
+ DatanodeInfo[] nodes = dinfo[blk];
+ for (int j = 0; j < nodes.length; j++) { // for each replica
+ for (int k = 0; downnodes != null && k < downnodes.length; k++) {
+ if (nodes[j].getName().equals(downnodes[k])) {
+ hasdown++;
+ System.out.println("Block " + blk + " replica " +
+ nodes[j].getName() + " is decommissioned.");
+ }
+ }
+ }
+ System.out.println("Block " + blk + " has " + hasdown +
+ " decommissioned replica.");
+ assertEquals("Number of replicas for block" + blk,
+ Math.min(numDatanodes, repl+hasdown), nodes.length);
+ }
+ }
+
+ private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
+ assertTrue(fileSys.exists(name));
+ fileSys.delete(name);
+ assertTrue(!fileSys.exists(name));
+ }
+
+ private void printDatanodeReport(DatanodeInfo[] info) {
+ System.out.println("-------------------------------------------------");
+ for (int i = 0; i < info.length; i++) {
+ System.out.println(info[i].getDatanodeReport());
+ System.out.println();
+ }
+ }
+
+ /*
+ * decommission one random node.
+ */
+ private String[] decommissionNode(DFSClient client, FileSystem filesys)
+ throws IOException {
+ DistributedFileSystem dfs = (DistributedFileSystem) filesys;
+ DatanodeInfo[] info = client.datanodeReport();
+
+ //
+ // pick one datanode randomly.
+ //
+ int index = myrand.nextInt(info.length);
+ String nodename = info[index].getName();
+ System.out.println("Decommissioning node: " + nodename);
+ String[] nodes = new String[1];
+ nodes[0] = nodename;
+ dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_SET, nodes);
+ return nodes;
+ }
+
+ /*
+ * put node back in action
+ */
+ private void commissionNode(DFSClient client, FileSystem filesys,
+ String[] nodes) throws IOException {
+ DistributedFileSystem dfs = (DistributedFileSystem) filesys;
+ DatanodeInfo[] info = client.datanodeReport();
+
+ for (int i = 0; i < nodes.length; i++) {
+ System.out.println("Putting node back in action: " + nodes[i]);
+ }
+ dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_CLEAR, nodes);
+ }
+
+ /*
+ * Check that node(s) were decommissioned
+ */
+ private void checkNodeDecommission(DFSClient client, FileSystem filesys,
+ String[] nodes) throws IOException {
+ DistributedFileSystem dfs = (DistributedFileSystem) filesys;
+ boolean ret = dfs.decommission(
+ FSConstants.DecommissionAction.DECOMMISSION_GET, nodes);
+ assertEquals("State of Decommissioned Datanode(s) ", ret, true);
+ }
+
+ /*
+ * Wait till node is fully decommissioned.
+ */
+ private void waitNodeDecommission(DFSClient client, FileSystem filesys,
+ String[] nodes) throws IOException {
+ DistributedFileSystem dfs = (DistributedFileSystem) filesys;
+ boolean done = dfs.decommission(
+ FSConstants.DecommissionAction.DECOMMISSION_GET, nodes);
+ while (!done) {
+ System.out.println("Waiting for nodes " + nodes[0] +
+ " to be fully decommissioned...");
+ try {
+ Thread.sleep(5000L);
+ } catch (InterruptedException e) {
+ // nothing
+ }
+ done = dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_GET,
+ nodes);
+ }
+ //
+ // sleep an additional 10 seconds for the blockreports from the datanodes
+ // to arrive.
+ //
+ try {
+ Thread.sleep(10 * 1000L);
+ } catch (Exception e) {
+ }
+ }
+
+ /**
+ * Tests Decommission in DFS.
+ */
+ public void testDecommission() throws IOException {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false);
+ // Now wait for 15 seconds to give datanodes chance to register
+ // themselves and to report heartbeat
+ try {
+ Thread.sleep(15000L);
+ } catch (InterruptedException e) {
+ // nothing
+ }
+ InetSocketAddress addr = new InetSocketAddress("localhost",
+ cluster.getNameNodePort());
+ DFSClient client = new DFSClient(addr, conf);
+ DatanodeInfo[] info = client.datanodeReport();
+ assertEquals("Number of Datanodes ", numDatanodes, info.length);
+ FileSystem fileSys = cluster.getFileSystem();
+ DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
+
+ try {
+ for (int iteration = 0; iteration < 2; iteration++) {
+ //
+ // Decommission one node. Verify that node is decommissioned.
+ // Verify that replication factor of file has increased from 3
+ // to 4. This means one replica is on decommissioned node.
+ //
+ Path file1 = new Path("smallblocktest.dat");
+ writeFile(fileSys, file1, 3);
+ checkFile(fileSys, file1, 3);
+ String downnodes[] = decommissionNode(client, fileSys);
+ waitNodeDecommission(client, fileSys, downnodes);
+ checkFile(fileSys, file1, 3, downnodes);
+ commissionNode(client, fileSys, downnodes);
+ cleanupFile(fileSys, file1);
+ }
+ } catch (IOException e) {
+ info = client.datanodeReport();
+ printDatanodeReport(info);
+ throw e;
+ } finally {
+ fileSys.close();
+ cluster.shutdown();
+ }
+ }
+}
Modified: lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp Thu Jan 4 11:14:39 2007
@@ -82,9 +82,15 @@
percentUsed = FsShell.limitDecimal(((1.0 * u)/c)*100, 2);
else
percentUsed = "100";
+
+ String adminState = (d.isDecommissioned() ? "Decommissioned" :
+ (d.isDecommissionInProgress() ? "Decommission In Progress":
+ "In Service"));
out.print("<td class=\"lastcontact\"> " +
((currentTime - d.getLastUpdate())/1000) +
+ "<td class=\"adminstate\">" +
+ adminState +
"<td class=\"size\">" +
FsShell.limitDecimal(c*1.0/diskBytes, 2) +
"<td class=\"pcused\">" + percentUsed +
@@ -167,6 +173,7 @@
out.print( "<tr class=\"headerRow\"> <th " +
NodeHeaderStr("name") + "> Node <th " +
NodeHeaderStr("lastcontact") + "> Last Contact <th " +
+ NodeHeaderStr("adminstate") + "> Admin State <th " +
NodeHeaderStr("size") + "> Size (" + diskByteStr +
") <th " + NodeHeaderStr("pcused") +
"> Used (%) <th " + NodeHeaderStr("blocks") +
Modified: lucene/hadoop/trunk/src/webapps/static/hadoop.css
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/static/hadoop.css?view=diff&rev=492695&r1=492694&r2=492695
==============================================================================
--- lucene/hadoop/trunk/src/webapps/static/hadoop.css (original)
+++ lucene/hadoop/trunk/src/webapps/static/hadoop.css Thu Jan 4 11:14:39 2007
@@ -41,7 +41,7 @@
cursor : pointer;
}
-div#dfsnodetable td.blocks, td.size, td.pcused, td.lastcontact {
+div#dfsnodetable td.blocks, td.size, td.pcused, td.adminstate, td.lastcontact {
text-align : right;
}