You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/01/19 00:24:11 UTC
svn commit: r1060619 - in /hadoop/hdfs/trunk: ./
src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/common/
src/java/org/apache/hadoop/hdfs/server/...
Author: suresh
Date: Tue Jan 18 23:24:10 2011
New Revision: 1060619
URL: http://svn.apache.org/viewvc?rev=1060619&view=rev
Log:
HDFS-1547. Improve decommission mechanism. Contributed by Suresh Srinivas.
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Jan 18 23:24:10 2011
@@ -36,6 +36,8 @@ Trunk (unreleased changes)
HDFS-1539. A config option for the datanode to fsycn a block file
when block is completely written. (dhruba)
+ HDFS-1547. Improve decommission mechanism. (suresh)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml (original)
+++ hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml Tue Jan 18 23:24:10 2011
@@ -209,14 +209,16 @@
</li>
<li>
<code>-refreshNodes</code>
- : Updates the set of hosts allowed to connect to namenode.
- Re-reads the config file to update values defined by dfs.hosts and
- dfs.host.exclude and reads the entires (hostnames) in those files.
- Each entry not defined in dfs.hosts but in dfs.hosts.exclude
- is decommissioned. Each entry defined in dfs.hosts and also in
- dfs.host.exclude is stopped from decommissioning if it has aleady
- been marked for decommission. Entires not present in both the lists
- are decommissioned.
+ : Updates the namenode with the set of datanodes allowed to
+ connect to the namenode. Namenodes re-read datanode hostnames
+ in the file defined by dfs.hosts, dfs.hosts.exclude. Hosts defined
+ in dfs.hosts are the datanodes that are part of the cluster.
+ If there are entries in dfs.hosts, only the hosts in it are
+ allowed to register with the namenode. Entries in dfs.hosts.exclude
+ are datanodes that need to be decommissioned. Datanodes complete
+ decommissioning when all the replicas from them are replicated
+ to other datanodes. Decommissioned nodes are not automatically
+ shutdown and are not chosen for writing for new replicas.
</li>
<li>
<code>-printTopology</code>
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSUtil.java Tue Jan 18 23:24:10 2011
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs;
import java.io.UnsupportedEncodingException;
+import java.util.Comparator;
import java.util.StringTokenizer;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -32,6 +33,20 @@ import org.apache.hadoop.net.NodeBase;
@InterfaceAudience.Private
public class DFSUtil {
/**
+ * Compartor for sorting DataNodeInfo[] based on decommissioned states.
+ * Decommissioned nodes are moved to the end of the array on sorting with
+ * this compartor.
+ */
+ public static final Comparator<DatanodeInfo> DECOM_COMPARATOR =
+ new Comparator<DatanodeInfo>() {
+ @Override
+ public int compare(DatanodeInfo a, DatanodeInfo b) {
+ return a.isDecommissioned() == b.isDecommissioned() ? 0 :
+ a.isDecommissioned() ? 1 : -1;
+ }
+ };
+
+ /**
* Whether the pathname is valid. Currently prohibits relative paths,
* and names which contain a ":" or "/"
*/
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Tue Jan 18 23:24:10 2011
@@ -59,7 +59,22 @@ public class DatanodeInfo extends Datano
protected String hostName = null;
// administrative states of a datanode
- public enum AdminStates {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
+ public enum AdminStates {
+ NORMAL("In Service"),
+ DECOMMISSION_INPROGRESS("Decommission In Progress"),
+ DECOMMISSIONED("Decommissioned");
+
+ final String value;
+
+ AdminStates(final String v) {
+ this.value = v;
+ }
+
+ public String toString() {
+ return value;
+ }
+ }
+
@Nullable
protected AdminStates adminState;
@@ -274,7 +289,7 @@ public class DatanodeInfo extends Datano
/**
* Retrieves the admin state of this node.
*/
- AdminStates getAdminState() {
+ public AdminStates getAdminState() {
if (adminState == null) {
return AdminStates.NORMAL;
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Tue Jan 18 23:24:10 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -262,6 +263,8 @@ public class JspHelper {
FIELD_NONDFS_USED = 7,
FIELD_REMAINING = 8,
FIELD_PERCENT_REMAINING = 9,
+ FIELD_ADMIN_STATE = 10,
+ FIELD_DECOMMISSIONED = 11,
SORT_ORDER_ASC = 1,
SORT_ORDER_DSC = 2;
@@ -285,6 +288,10 @@ public class JspHelper {
sortField = FIELD_PERCENT_REMAINING;
} else if (field.equals("blocks")) {
sortField = FIELD_BLOCKS;
+ } else if (field.equals("adminstate")) {
+ sortField = FIELD_ADMIN_STATE;
+ } else if (field.equals("decommissioned")) {
+ sortField = FIELD_DECOMMISSIONED;
} else {
sortField = FIELD_NAME;
}
@@ -332,6 +339,13 @@ public class JspHelper {
case FIELD_BLOCKS:
ret = d1.numBlocks() - d2.numBlocks();
break;
+ case FIELD_ADMIN_STATE:
+ ret = d1.getAdminState().toString().compareTo(
+ d2.getAdminState().toString());
+ break;
+ case FIELD_DECOMMISSIONED:
+ ret = DFSUtil.DECOM_COMPARATOR.compare(d1, d2);
+ break;
case FIELD_NAME:
ret = d1.getHostName().compareTo(d2.getHostName());
break;
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Tue Jan 18 23:24:10 2011
@@ -124,6 +124,12 @@ public class DatanodeDescriptor extends
private long lastBlocksScheduledRollTime = 0;
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
private int volumeFailures = 0;
+
+ /**
+ * When set to true, the node is not in include list and is not allowed
+ * to communicate with the namenode
+ */
+ private boolean disallowed = false;
/** Default constructor */
public DatanodeDescriptor() {}
@@ -681,4 +687,16 @@ public class DatanodeDescriptor extends
super.updateRegInfo(nodeReg);
volumeFailures = 0;
}
+
+ /**
+ * Set the flag to indicate if this datanode is disallowed from communicating
+ * with the namenode.
+ */
+ void setDisallowed(boolean flag) {
+ disallowed = flag;
+ }
+
+ boolean isDisallowed() {
+ return disallowed;
+ }
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jan 18 23:24:10 2011
@@ -779,6 +779,9 @@ public class FSNamesystem implements FSC
clientMachine);
for (LocatedBlock b : blocks.getLocatedBlocks()) {
clusterMap.pseudoSortByDistance(client, b.getLocations());
+
+ // Move decommissioned datanodes to the bottom
+ Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
}
}
return blocks;
@@ -2559,6 +2562,7 @@ public class FSNamesystem implements FSC
clusterMap.remove(nodeS);
nodeS.updateRegInfo(nodeReg);
nodeS.setHostName(hostName);
+ nodeS.setDisallowed(false); // Node is in the include list
// resolve network location
resolveNetworkLocation(nodeS);
@@ -2573,6 +2577,7 @@ public class FSNamesystem implements FSC
nodeS.isAlive = true;
}
}
+ checkDecommissioning(nodeS, dnAddress);
return;
}
@@ -2593,7 +2598,8 @@ public class FSNamesystem implements FSC
resolveNetworkLocation(nodeDescr);
unprotectedAddDatanode(nodeDescr);
clusterMap.add(nodeDescr);
-
+ checkDecommissioning(nodeDescr, dnAddress);
+
// also treat the registration message as a heartbeat
synchronized(heartbeats) {
heartbeats.add(nodeDescr);
@@ -2699,13 +2705,13 @@ public class FSNamesystem implements FSC
} catch(UnregisteredNodeException e) {
return new DatanodeCommand[]{DatanodeCommand.REGISTER};
}
-
+
// Check if this datanode should actually be shutdown instead.
- if (nodeinfo != null && shouldNodeShutdown(nodeinfo)) {
+ if (nodeinfo != null && nodeinfo.isDisallowed()) {
setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo);
}
-
+
if (nodeinfo == null || !nodeinfo.isAlive) {
return new DatanodeCommand[]{DatanodeCommand.REGISTER};
}
@@ -2754,18 +2760,28 @@ public class FSNamesystem implements FSC
private void updateStats(DatanodeDescriptor node, boolean isAdded) {
//
// The statistics are protected by the heartbeat lock
+ // For decommissioning/decommissioned nodes, only used capacity
+ // is counted.
//
assert(Thread.holdsLock(heartbeats));
if (isAdded) {
- capacityTotal += node.getCapacity();
capacityUsed += node.getDfsUsed();
- capacityRemaining += node.getRemaining();
totalLoad += node.getXceiverCount();
+ if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ capacityTotal += node.getCapacity();
+ capacityRemaining += node.getRemaining();
+ } else {
+ capacityTotal += node.getDfsUsed();
+ }
} else {
- capacityTotal -= node.getCapacity();
capacityUsed -= node.getDfsUsed();
- capacityRemaining -= node.getRemaining();
totalLoad -= node.getXceiverCount();
+ if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ capacityTotal -= node.getCapacity();
+ capacityRemaining -= node.getRemaining();
+ } else {
+ capacityTotal -= node.getDfsUsed();
+ }
}
}
@@ -3072,12 +3088,6 @@ public class FSNamesystem implements FSC
+ nodeID.getName());
}
- // Check if this datanode should actually be shutdown instead.
- if (shouldNodeShutdown(node)) {
- setDatanodeDead(node);
- throw new DisallowedDatanodeException(node);
- }
-
blockManager.processReport(node, newReport);
NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
} finally {
@@ -3209,12 +3219,6 @@ public class FSNamesystem implements FSC
+block+" is received from " + nodeID.getName());
}
- // Check if this datanode should actually be shutdown instead.
- if (shouldNodeShutdown(node)) {
- setDatanodeDead(node);
- throw new DisallowedDatanodeException(node);
- }
-
blockManager.addBlock(node, block, delHint);
} finally {
writeUnlock();
@@ -3471,12 +3475,16 @@ public class FSNamesystem implements FSC
throws IOException {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
- LOG.info("Start Decommissioning node " + node.getName());
- node.startDecommission();
+ LOG.info("Start Decommissioning node " + node.getName() + " with " +
+ node.numBlocks() + " blocks.");
+ synchronized (heartbeats) {
+ updateStats(node, false);
+ node.startDecommission();
+ updateStats(node, true);
+ }
node.decommissioningStatus.setStartTime(now());
- //
- // all the blocks that reside on this node have to be
- // replicated.
+
+ // all the blocks that reside on this node have to be replicated.
checkDecommissionStateInternal(node);
}
}
@@ -3486,8 +3494,14 @@ public class FSNamesystem implements FSC
*/
public void stopDecommission (DatanodeDescriptor node)
throws IOException {
- LOG.info("Stop Decommissioning node " + node.getName());
- node.stopDecommission();
+ if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+ LOG.info("Stop Decommissioning node " + node.getName());
+ synchronized (heartbeats) {
+ updateStats(node, false);
+ node.stopDecommission();
+ updateStats(node, true);
+ }
+ }
}
/**
@@ -3574,10 +3588,7 @@ public class FSNamesystem implements FSC
LOG.info("Decommission complete for node " + node.getName());
}
}
- if (node.isDecommissioned()) {
- return true;
- }
- return false;
+ return node.isDecommissioned();
}
/**
@@ -3627,18 +3638,12 @@ public class FSNamesystem implements FSC
DatanodeDescriptor node = it.next();
// Check if not include.
if (!inHostsList(node, null)) {
- node.setDecommissioned(); // case 2.
+ node.setDisallowed(true); // case 2.
} else {
if (inExcludedHostsList(node, null)) {
- if (!node.isDecommissionInProgress() &&
- !node.isDecommissioned()) {
- startDecommission(node); // case 3.
- }
+ startDecommission(node); // case 3.
} else {
- if (node.isDecommissionInProgress() ||
- node.isDecommissioned()) {
- stopDecommission(node); // case 4.
- }
+ stopDecommission(node); // case 4.
}
}
}
@@ -3654,39 +3659,22 @@ public class FSNamesystem implements FSC
/**
* Checks if the node is not on the hosts list. If it is not, then
- * it will be ignored. If the node is in the hosts list, but is also
- * on the exclude list, then it will be decommissioned.
- * Returns FALSE if node is rejected for registration.
- * Returns TRUE if node is registered (including when it is on the
- * exclude list and is being decommissioned).
+ * it will be disallowed from registering.
*/
- private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr)
- throws IOException {
+ private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) {
assert (hasWriteLock());
- if (!inHostsList(nodeReg, ipAddr)) {
- return false;
- }
- if (inExcludedHostsList(nodeReg, ipAddr)) {
- DatanodeDescriptor node = getDatanode(nodeReg);
- if (node == null) {
- throw new IOException("verifyNodeRegistration: unknown datanode " +
- nodeReg.getName());
- }
- if (!checkDecommissionStateInternal(node)) {
- startDecommission(node);
- }
- }
- return true;
+ return inHostsList(nodeReg, ipAddr);
}
/**
- * Checks if the Admin state bit is DECOMMISSIONED. If so, then
- * we should shut it down.
- *
- * Returns true if the node should be shutdown.
+ * Decommission the node if it is in exclude list.
*/
- private boolean shouldNodeShutdown(DatanodeDescriptor node) {
- return (node.isDecommissioned());
+ private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr)
+ throws IOException {
+ // If the registered node is in exclude list, then decommission it
+ if (inExcludedHostsList(nodeReg, ipAddr)) {
+ startDecommission(nodeReg);
+ }
}
/**
@@ -4941,6 +4929,9 @@ public class FSNamesystem implements FSC
}
}
+ /**
+ * @return list of datanodes where decommissioning is in progress
+ */
public ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
readLock();
try {
@@ -4960,10 +4951,9 @@ public class FSNamesystem implements FSC
}
}
- /*
- * Delegation Token
+ /**
+ * Create delegation token secret manager
*/
-
private DelegationTokenSecretManager createDelegationTokenSecretManager(
Configuration conf) {
return new DelegationTokenSecretManager(conf.getLong(
@@ -5288,6 +5278,7 @@ public class FSNamesystem implements FSC
final Map<String, Object> innerinfo = new HashMap<String, Object>();
innerinfo.put("lastContact", getLastContact(node));
innerinfo.put("usedSpace", getDfsUsed(node));
+ innerinfo.put("adminState", node.getAdminState().toString());
info.put(node.getHostName(), innerinfo);
}
return JSON.toString(info);
@@ -5305,6 +5296,7 @@ public class FSNamesystem implements FSC
for (DatanodeDescriptor node : deadNodeList) {
final Map<String, Object> innerinfo = new HashMap<String, Object>();
innerinfo.put("lastContact", getLastContact(node));
+ innerinfo.put("decommissioned", node.isDecommissioned());
info.put(node.getHostName(), innerinfo);
}
return JSON.toString(info);
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Tue Jan 18 23:24:10 2011
@@ -207,7 +207,17 @@ class NamenodeJspHelper {
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
fsn.DFSNodesStatus(live, dead);
+
+ int liveDecommissioned = 0;
+ for (DatanodeDescriptor d : live) {
+ liveDecommissioned += d.isDecommissioned() ? 1 : 0;
+ }
+ int deadDecommissioned = 0;
+ for (DatanodeDescriptor d : dead) {
+ deadDecommissioned += d.isDecommissioned() ? 1 : 0;
+ }
+
ArrayList<DatanodeDescriptor> decommissioning = fsn
.getDecommissioningNodes();
@@ -292,9 +302,13 @@ class NamenodeJspHelper {
+ colTxt() + StringUtils.limitDecimalTo2(dev) + " %"
+ rowTxt() + colTxt()
+ "<a href=\"dfsnodelist.jsp?whatNodes=LIVE\">Live Nodes</a> "
- + colTxt() + ":" + colTxt() + live.size() + rowTxt() + colTxt()
+ + colTxt() + ":" + colTxt() + live.size()
+ + " (Decommissioned: " + liveDecommissioned + ")"
+ + rowTxt() + colTxt()
+ "<a href=\"dfsnodelist.jsp?whatNodes=DEAD\">Dead Nodes</a> "
- + colTxt() + ":" + colTxt() + dead.size() + rowTxt() + colTxt()
+ + colTxt() + ":" + colTxt() + dead.size()
+ + " (Decommissioned: " + deadDecommissioned + ")"
+ + rowTxt() + colTxt()
+ "<a href=\"dfsnodelist.jsp?whatNodes=DECOMMISSIONING\">"
+ "Decommissioning Nodes</a> "
+ colTxt() + ":" + colTxt() + decommissioning.size()
@@ -446,8 +460,11 @@ class NamenodeJspHelper {
*/
generateNodeDataHeader(out, d, suffix, alive, nnHttpPort);
- if (!alive)
+ if (!alive) {
+ out.print("<td class=\"decommissioned\"> " +
+ d.isDecommissioned() + "\n");
return;
+ }
long c = d.getCapacity();
long u = d.getDfsUsed();
@@ -457,9 +474,7 @@ class NamenodeJspHelper {
String percentRemaining = StringUtils.limitDecimalTo2(d
.getRemainingPercent());
- String adminState = (d.isDecommissioned() ? "Decommissioned" : (d
- .isDecommissionInProgress() ? "Decommission In Progress"
- : "In Service"));
+ String adminState = d.getAdminState().toString();
long timestamp = d.getLastUpdate();
long currentTime = System.currentTimeMillis();
@@ -502,7 +517,6 @@ class NamenodeJspHelper {
sorterOrder = "ASC";
JspHelper.sortNodeList(live, sorterField, sorterOrder);
- JspHelper.sortNodeList(dead, "name", "ASC");
// Find out common suffix. Should this be before or after the sort?
String port_suffix = null;
@@ -535,7 +549,6 @@ class NamenodeJspHelper {
int nnHttpPort = nn.getHttpAddress().getPort();
out.print("<div id=\"dfsnodetable\"> ");
if (whatNodes.equals("LIVE")) {
-
out.print("<a name=\"LiveNodes\" id=\"title\">" + "Live Datanodes : "
+ live.size() + "</a>"
+ "<br><br>\n<table border=1 cellspacing=0>\n");
@@ -577,9 +590,11 @@ class NamenodeJspHelper {
if (dead.size() > 0) {
out.print("<table border=1 cellspacing=0> <tr id=\"row1\"> "
- + "<td> Node \n");
+ + "<th " + nodeHeaderStr("node")
+ + "> Node <th " + nodeHeaderStr("decommissioned")
+ + "> Decommissioned\n");
- JspHelper.sortNodeList(dead, "name", "ASC");
+ JspHelper.sortNodeList(dead, sorterField, sorterOrder);
for (int i = 0; i < dead.size(); i++) {
generateNodeData(out, dead.get(i), port_suffix, false, nnHttpPort);
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Tue Jan 18 23:24:10 2011
@@ -501,16 +501,18 @@ public class DFSAdmin extends FsShell {
"Set/Unset/Check flag to attempt restore of failed storage replicas if they become available.\n" +
"\t\tRequires superuser permissions.\n";
- String refreshNodes = "-refreshNodes: \tUpdates the set of hosts allowed " +
- "to connect to namenode.\n\n" +
- "\t\tRe-reads the config file to update values defined by \n" +
- "\t\tdfs.hosts and dfs.host.exclude and reads the \n" +
- "\t\tentires (hostnames) in those files.\n\n" +
- "\t\tEach entry not defined in dfs.hosts but in \n" +
- "\t\tdfs.hosts.exclude is decommissioned. Each entry defined \n" +
- "\t\tin dfs.hosts and also in dfs.host.exclude is stopped from \n" +
- "\t\tdecommissioning if it has aleady been marked for decommission.\n" +
- "\t\tEntires not present in both the lists are decommissioned.\n";
+ String refreshNodes = "-refreshNodes: \tUpdates the namenode with the " +
+ "set of datanodes allowed to connect to the namenode.\n\n" +
+ "\t\tNamenode re-reads datanode hostnames from the file defined by \n" +
+ "\t\tdfs.hosts, dfs.hosts.exclude configuration parameters.\n" +
+ "\t\tHosts defined in dfs.hosts are the datanodes that are part of \n" +
+ "\t\tthe cluster. If there are entries in dfs.hosts, only the hosts \n" +
+ "\t\tin it are allowed to register with the namenode.\n\n" +
+ "\t\tEntries in dfs.hosts.exclude are datanodes that need to be \n" +
+ "\t\tdecommissioned. Datanodes complete decommissioning when \n" +
+ "\t\tall the replicas from them are replicated to other datanodes.\n" +
+ "\t\tDecommissioned nodes are not automatically shutdown and \n" +
+ "\t\tare not chosen for writing new replicas.\n";
String finalizeUpgrade = "-finalizeUpgrade: Finalize upgrade of HDFS.\n" +
"\t\tDatanodes delete their previous version working directories,\n" +
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml Tue Jan 18 23:24:10 2011
@@ -15899,39 +15899,47 @@
<comparators>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^-refreshNodes:( |\t)*Updates the set of hosts allowed to connect to namenode.( )*</expected-output>
+ <expected-output>^-refreshNodes:( |\t)*Updates the namenode with the set of datanodes allowed to connect to the namenode.( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*Re-reads the config file to update values defined by( )*</expected-output>
+ <expected-output>^( |\t)*Namenode re-reads datanode hostnames from the file defined by( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*dfs.hosts and dfs.host.exclude and reads the( )*</expected-output>
+ <expected-output>^( |\t)*dfs.hosts, dfs.hosts.exclude configuration parameters.( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*entires \(hostnames\) in those files.( )*</expected-output>
+ <expected-output>^( |\t)*Hosts defined in dfs.hosts are the datanodes that are part of( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*Each entry not defined in dfs.hosts but in( )*</expected-output>
+ <expected-output>^( |\t)*the cluster. If there are entries in dfs.hosts, only the hosts( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*dfs.hosts.exclude is decommissioned. Each entry defined( )*</expected-output>
+ <expected-output>^( |\t)*in it are allowed to register with the namenode.( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*in dfs.hosts and also in dfs.host.exclude is stopped from( )*</expected-output>
+ <expected-output>^( |\t)*Entries in dfs.hosts.exclude are datanodes that need to be( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*decommissioning if it has aleady been marked for decommission.( )*</expected-output>
+ <expected-output>^( |\t)*decommissioned. Datanodes complete decommissioning when ( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*Entires not present in both the lists are decommissioned.( )*</expected-output>
+ <expected-output>^( |\t)*all the replicas from them are replicated to other datanodes.( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^( |\t)*Decommissioned nodes are not automatically shutdown and( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^( |\t)*are not chosen for writing new replicas.( )*</expected-output>
</comparator>
</comparators>
</test>
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Jan 18 23:24:10 2011
@@ -20,9 +20,12 @@ package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
+import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.FileChannel;
@@ -86,6 +89,7 @@ public class MiniDFSCluster {
private long [] simulatedCapacities = null;
// wait until namenode has left safe mode?
private boolean waitSafeMode = true;
+ private boolean setupHostsFile = false;
public Builder(Configuration conf) {
this.conf = conf;
@@ -172,6 +176,15 @@ public class MiniDFSCluster {
}
/**
+ * Default: false
+ * When true the hosts file/include file for the cluster is setup
+ */
+ public Builder setupHostsFile(boolean val) {
+ this.setupHostsFile = val;
+ return this;
+ }
+
+ /**
* Construct the actual MiniDFSCluster
*/
public MiniDFSCluster build() throws IOException {
@@ -193,7 +206,8 @@ public class MiniDFSCluster {
builder.racks,
builder.hosts,
builder.simulatedCapacities,
- builder.waitSafeMode);
+ builder.waitSafeMode,
+ builder.setupHostsFile);
}
public class DataNodeProperties {
@@ -390,13 +404,14 @@ public class MiniDFSCluster {
long[] simulatedCapacities) throws IOException {
initMiniDFSCluster(nameNodePort, conf, numDataNodes, format,
manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts,
- simulatedCapacities, true);
+ simulatedCapacities, true, false);
}
private void initMiniDFSCluster(int nameNodePort, Configuration conf,
int numDataNodes, boolean format, boolean manageNameDfsDirs,
boolean manageDataDfsDirs, StartupOption operation, String[] racks,
- String[] hosts, long[] simulatedCapacities, boolean waitSafeMode)
+ String[] hosts, long[] simulatedCapacities, boolean waitSafeMode,
+ boolean setupHostsFile)
throws IOException {
this.conf = conf;
base_dir = new File(getBaseDirectory());
@@ -461,8 +476,8 @@ public class MiniDFSCluster {
nameNode = NameNode.createNameNode(args, conf);
// Start the DataNodes
- startDataNodes(conf, numDataNodes, manageDataDfsDirs,
- operation, racks, hosts, simulatedCapacities);
+ startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks,
+ hosts, simulatedCapacities, setupHostsFile);
waitClusterUp();
//make sure ProxyUsers uses the latest conf
@@ -531,7 +546,8 @@ public class MiniDFSCluster {
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] hosts,
- long[] simulatedCapacities) throws IOException {
+ long[] simulatedCapacities,
+ boolean setupHostsFile) throws IOException {
int curDatanodesNum = dataNodes.size();
// for mincluster's the default initialDelay for BRs is 0
@@ -577,12 +593,6 @@ public class MiniDFSCluster {
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
- // Set up the right ports for the datanodes
- conf.set("dfs.datanode.address", "127.0.0.1:0");
- conf.set("dfs.datanode.http.address", "127.0.0.1:0");
- conf.set("dfs.datanode.ipc.address", "127.0.0.1:0");
-
-
String [] dnArgs = (operation == null ||
operation != StartupOption.ROLLBACK) ?
null : new String[] {operation.getName()};
@@ -590,6 +600,8 @@ public class MiniDFSCluster {
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
Configuration dnConf = new HdfsConfiguration(conf);
+ // Set up datanode address
+ setupDatanodeAddress(dnConf, setupHostsFile);
if (manageDfsDirs) {
File dir1 = new File(data_dir, "data"+(2*i+1));
File dir2 = new File(data_dir, "data"+(2*i+2));
@@ -671,7 +683,8 @@ public class MiniDFSCluster {
boolean manageDfsDirs, StartupOption operation,
String[] racks
) throws IOException {
- startDataNodes( conf, numDataNodes, manageDfsDirs, operation, racks, null, null);
+ startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, null,
+ null, false);
}
/**
@@ -701,7 +714,7 @@ public class MiniDFSCluster {
String[] racks,
long[] simulatedCapacities) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, null,
- simulatedCapacities);
+ simulatedCapacities, false);
}
/**
@@ -1197,4 +1210,48 @@ public class MiniDFSCluster {
public static String getBaseDirectory() {
return System.getProperty("test.build.data", "build/test/data") + "/dfs/";
}
+
+ private int getFreeSocketPort() {
+ int port = 0;
+ try {
+ ServerSocket s = new ServerSocket(0);
+ port = s.getLocalPort();
+ s.close();
+ return port;
+ } catch (IOException e) {
+ // Could not get a free port. Return default port 0.
+ }
+ return port;
+ }
+
+ private void setupDatanodeAddress(Configuration conf, boolean setupHostsFile) throws IOException {
+ if (setupHostsFile) {
+ String hostsFile = conf.get("dfs.hosts", "").trim();
+ if (hostsFile.length() == 0) {
+ throw new IOException("Parameter dfs.hosts is not setup in conf");
+ }
+ // Setup datanode in the include file, if it is defined in the conf
+ String address = "127.0.0.1:" + getFreeSocketPort();
+ conf.set("dfs.datanode.address", address);
+ addToFile(hostsFile, address);
+ LOG.info("Adding datanode " + address + " to hosts file " + hostsFile);
+ } else {
+ conf.set("dfs.datanode.address", "127.0.0.1:0");
+ conf.set("dfs.datanode.http.address", "127.0.0.1:0");
+ conf.set("dfs.datanode.ipc.address", "127.0.0.1:0");
+ }
+ }
+
+ private void addToFile(String p, String address) throws IOException {
+ File f = new File(p);
+ if (!f.exists()) {
+ f.createNewFile();
+ }
+ PrintWriter writer = new PrintWriter(new FileWriter(f, true));
+ try {
+ writer.println(address);
+ } finally {
+ writer.close();
+ }
+ }
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java?rev=1060619&r1=1060618&r2=1060619&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java Tue Jan 18 23:24:10 2011
@@ -24,8 +24,8 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Random;
-import junit.framework.TestCase;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -33,36 +33,68 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
/**
* This class tests the decommissioning of nodes.
*/
-public class TestDecommission extends TestCase {
+public class TestDecommission {
+ public static final Log LOG = LogFactory.getLog(TestDecommission.class);
static final long seed = 0xDEADBEEFL;
static final int blockSize = 8192;
static final int fileSize = 16384;
- static final int numDatanodes = 6;
-
+ static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
Random myrand = new Random();
Path hostsFile;
Path excludeFile;
-
- ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
-
- private enum NodeState {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
-
- private void writeConfigFile(FileSystem fs, Path name, ArrayList<String> nodes)
+ FileSystem localFileSys;
+ Configuration conf;
+ MiniDFSCluster cluster = null;
+
+ @Before
+ public void setup() throws IOException {
+ conf = new HdfsConfiguration();
+ // Set up the hosts/exclude files.
+ localFileSys = FileSystem.getLocal(conf);
+ Path workingDir = localFileSys.getWorkingDirectory();
+ Path dir = new Path(workingDir, "build/test/data/work-dir/decommission");
+ hostsFile = new Path(dir, "hosts");
+ excludeFile = new Path(dir, "exclude");
+
+ // Setup conf
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
+ conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath());
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
+ writeConfigFile(excludeFile, null);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ cleanupFile(localFileSys, excludeFile.getParent());
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private void writeConfigFile(Path name, ArrayList<String> nodes)
throws IOException {
-
// delete if it already exists
- if (fs.exists(name)) {
- fs.delete(name, true);
+ if (localFileSys.exists(name)) {
+ localFileSys.delete(name, true);
}
- FSDataOutputStream stm = fs.create(name);
+ FSDataOutputStream stm = localFileSys.create(name);
if (nodes != null) {
for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
@@ -87,23 +119,17 @@ public class TestDecommission extends Te
stm.close();
}
-
- private void checkFile(FileSystem fileSys, Path name, int repl)
- throws IOException {
- DFSTestUtil.waitReplication(fileSys, name, (short) repl);
- }
-
private void printFileLocations(FileSystem fileSys, Path name)
throws IOException {
BlockLocation[] locations = fileSys.getFileBlockLocations(
fileSys.getFileStatus(name), 0, fileSize);
for (int idx = 0; idx < locations.length; idx++) {
String[] loc = locations[idx].getHosts();
- System.out.print("Block[" + idx + "] : ");
+ StringBuilder buf = new StringBuilder("Block[" + idx + "] : ");
for (int j = 0; j < loc.length; j++) {
- System.out.print(loc[j] + " ");
+ buf.append(loc[j] + " ");
}
- System.out.println("");
+ LOG.info(buf.toString());
}
}
@@ -112,7 +138,7 @@ public class TestDecommission extends Te
* replication factor is 1 more than the specified one.
*/
private void checkFile(FileSystem fileSys, Path name, int repl,
- String downnode) throws IOException {
+ String downnode, int numDatanodes) throws IOException {
//
// sleep an additional 10 seconds for the blockreports from the datanodes
// to arrive.
@@ -126,16 +152,24 @@ public class TestDecommission extends Te
for (LocatedBlock blk : dinfo) { // for each block
int hasdown = 0;
+ int firstDecomNodeIndex = -1;
DatanodeInfo[] nodes = blk.getLocations();
for (int j = 0; j < nodes.length; j++) { // for each replica
if (nodes[j].getName().equals(downnode)) {
hasdown++;
- System.out.println("Block " + blk.getBlock() + " replica " +
- nodes[j].getName() + " is decommissioned.");
+ LOG.info("Block " + blk.getBlock() + " replica " + nodes[j].getName()
+ + " is decommissioned.");
+ }
+ if (nodes[j].isDecommissioned()) {
+ if (firstDecomNodeIndex == -1) {
+ firstDecomNodeIndex = j;
+ }
+ continue;
}
+ assertEquals("Decom node is not at the end", firstDecomNodeIndex, -1);
}
- System.out.println("Block " + blk.getBlock() + " has " + hasdown +
- " decommissioned replica.");
+ LOG.info("Block " + blk.getBlock() + " has " + hasdown
+ + " decommissioned replica.");
assertEquals("Number of replicas for block" + blk.getBlock(),
Math.min(numDatanodes, repl+hasdown), nodes.length);
}
@@ -148,21 +182,21 @@ public class TestDecommission extends Te
}
private void printDatanodeReport(DatanodeInfo[] info) {
- System.out.println("-------------------------------------------------");
+ LOG.info("-------------------------------------------------");
for (int i = 0; i < info.length; i++) {
- System.out.println(info[i].getDatanodeReport());
- System.out.println();
+ LOG.info(info[i].getDatanodeReport());
+ LOG.info("");
}
}
/*
* decommission one random node.
*/
- private String decommissionNode(FSNamesystem namesystem,
- Configuration conf,
- DFSClient client,
- FileSystem localFileSys)
+ private DatanodeInfo decommissionNode(FSNamesystem namesystem,
+ ArrayList<String>decommissionedNodes,
+ AdminStates waitForState)
throws IOException {
+ DFSClient client = getDfsClient(cluster, conf);
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
//
@@ -177,122 +211,188 @@ public class TestDecommission extends Te
}
}
String nodename = info[index].getName();
- System.out.println("Decommissioning node: " + nodename);
+ LOG.info("Decommissioning node: " + nodename);
// write nodename into the exclude file.
ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes);
nodes.add(nodename);
- writeConfigFile(localFileSys, excludeFile, nodes);
+ writeConfigFile(excludeFile, nodes);
namesystem.refreshNodes(conf);
- return nodename;
- }
-
- /*
- * Check if node is in the requested state.
- */
- private boolean checkNodeState(FileSystem filesys,
- String node,
- NodeState state) throws IOException {
- DistributedFileSystem dfs = (DistributedFileSystem) filesys;
- boolean done = false;
- boolean foundNode = false;
- DatanodeInfo[] datanodes = dfs.getDataNodeStats();
- for (int i = 0; i < datanodes.length; i++) {
- DatanodeInfo dn = datanodes[i];
- if (dn.getName().equals(node)) {
- if (state == NodeState.DECOMMISSIONED) {
- done = dn.isDecommissioned();
- } else if (state == NodeState.DECOMMISSION_INPROGRESS) {
- done = dn.isDecommissionInProgress();
- } else {
- done = (!dn.isDecommissionInProgress() && !dn.isDecommissioned());
- }
- System.out.println(dn.getDatanodeReport());
- foundNode = true;
- }
- }
- if (!foundNode) {
- throw new IOException("Could not find node: " + node);
- }
- return done;
+ DatanodeInfo ret = namesystem.getDatanode(info[index]);
+ waitNodeState(ret, waitForState);
+ return ret;
}
/*
* Wait till node is fully decommissioned.
*/
- private void waitNodeState(FileSystem filesys,
- String node,
- NodeState state) throws IOException {
- boolean done = checkNodeState(filesys, node, state);
+ private void waitNodeState(DatanodeInfo node,
+ AdminStates state) throws IOException {
+ boolean done = state == node.getAdminState();
while (!done) {
- System.out.println("Waiting for node " + node +
- " to change state to " + state);
+ LOG.info("Waiting for node " + node + " to change state to "
+ + state + " current state: " + node.getAdminState());
try {
- Thread.sleep(1000);
+ Thread.sleep(HEARTBEAT_INTERVAL * 1000);
} catch (InterruptedException e) {
// nothing
}
- done = checkNodeState(filesys, node, state);
+ done = state == node.getAdminState();
}
}
- /**
- * Tests Decommission in DFS.
- */
- public void testDecommission() throws IOException {
- Configuration conf = new HdfsConfiguration();
- conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
-
- // Set up the hosts/exclude files.
- FileSystem localFileSys = FileSystem.getLocal(conf);
- Path workingDir = localFileSys.getWorkingDirectory();
- Path dir = new Path(workingDir, "build/test/data/work-dir/decommission");
- assertTrue(localFileSys.mkdirs(dir));
- hostsFile = new Path(dir, "hosts");
- excludeFile = new Path(dir, "exclude");
- conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath());
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
- conf.setInt("dfs.heartbeat.interval", 1);
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
- writeConfigFile(localFileSys, excludeFile, null);
-
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(numDatanodes).build();
- cluster.waitActive();
+ /* Get DFSClient to the namenode */
+ private static DFSClient getDfsClient(MiniDFSCluster cluster,
+ Configuration conf) throws IOException {
InetSocketAddress addr = new InetSocketAddress("localhost",
cluster.getNameNodePort());
- DFSClient client = new DFSClient(addr, conf);
+ return new DFSClient(addr, conf);
+ }
+
+ /* Validate cluster has expected number of datanodes */
+ private static void validateCluster(DFSClient client, int numDNs)
+ throws IOException {
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
- assertEquals("Number of Datanodes ", numDatanodes, info.length);
- FileSystem fileSys = cluster.getFileSystem();
+ assertEquals("Number of Datanodes ", numDNs, info.length);
+ }
+
+ /** Start a MiniDFSCluster
+ * @throws IOException */
+ private void startCluster(int numDatanodes, Configuration conf)
+ throws IOException {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
+ .build();
+ cluster.waitActive();
+ DFSClient client = getDfsClient(cluster, conf);
+ validateCluster(client, numDatanodes);
+ }
+
+ private void verifyStats(NameNode namenode, FSNamesystem fsn,
+ DatanodeInfo node, boolean decommissioning) throws InterruptedException {
+ // Do the stats check over 10 iterations
+ for (int i = 0; i < 10; i++) {
+ long[] newStats = namenode.getStats();
+
+ // For decommissioning nodes, ensure capacity of the DN is no longer
+ // counted. Only used space of the DN is counted in cluster capacity
+ assertEquals(newStats[0], decommissioning ? node.getDfsUsed() :
+ node.getCapacity());
+
+ // Ensure cluster used capacity is counted for both normal and
+ // decommissioning nodes
+ assertEquals(newStats[1], node.getDfsUsed());
+
+ // For decommissioning nodes, remaining space from the DN is not counted
+ assertEquals(newStats[2], decommissioning ? 0 : node.getRemaining());
+
+ // Ensure transceiver count is same as that DN
+ assertEquals(fsn.getTotalLoad(), node.getXceiverCount());
+
+ Thread.sleep(HEARTBEAT_INTERVAL * 1000); // Sleep heart beat interval
+ }
+ }
+ /**
+ * Tests Decommission in DFS.
+ */
+ @Test
+ public void testDecommission() throws IOException {
+ LOG.info("Starting test testDecommission");
+ int numDatanodes = 6;
+ startCluster(numDatanodes, conf);
try {
+ DFSClient client = getDfsClient(cluster, conf);
+ FileSystem fileSys = cluster.getFileSystem();
+ FSNamesystem fsn = cluster.getNamesystem();
+ ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
for (int iteration = 0; iteration < numDatanodes - 1; iteration++) {
int replicas = numDatanodes - iteration - 1;
- //
+
// Decommission one node. Verify that node is decommissioned.
- //
- Path file1 = new Path("decommission.dat");
+ Path file1 = new Path("testDecommission.dat");
writeFile(fileSys, file1, replicas);
- System.out.println("Created file decommission.dat with " +
- replicas + " replicas.");
- checkFile(fileSys, file1, replicas);
+ LOG.info("Created file decommission.dat with " + replicas
+ + " replicas.");
printFileLocations(fileSys, file1);
- String downnode = decommissionNode(cluster.getNamesystem(), conf,
- client, localFileSys);
- decommissionedNodes.add(downnode);
- waitNodeState(fileSys, downnode, NodeState.DECOMMISSIONED);
- checkFile(fileSys, file1, replicas, downnode);
+ DatanodeInfo downnode = decommissionNode(fsn, decommissionedNodes,
+ AdminStates.DECOMMISSIONED);
+ decommissionedNodes.add(downnode.getName());
+
+ // Ensure decommissioned datanode is not automatically shutdown
+ assertEquals("All datanodes must be alive", numDatanodes,
+ client.datanodeReport(DatanodeReportType.LIVE).length);
+
+ checkFile(fileSys, file1, replicas, downnode.getName(), numDatanodes);
cleanupFile(fileSys, file1);
- cleanupFile(localFileSys, dir);
}
+
+ // Restart the cluster and ensure decommissioned datanodes
+ // are allowed to register with the namenode
+ cluster.shutdown();
+ startCluster(numDatanodes, conf);
} catch (IOException e) {
- info = client.datanodeReport(DatanodeReportType.ALL);
+ DFSClient client = getDfsClient(cluster, conf);
+ DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL);
printDatanodeReport(info);
throw e;
- } finally {
- fileSys.close();
- cluster.shutdown();
}
}
+
+ /**
+ * Tests cluster storage statistics during decommissioning
+ */
+ @Test
+ public void testClusterStats() throws IOException, InterruptedException {
+ LOG.info("Starting test testClusterStats");
+ int numDatanodes = 1;
+ startCluster(numDatanodes, conf);
+
+ FileSystem fileSys = cluster.getFileSystem();
+ Path file = new Path("testClusterStats.dat");
+ writeFile(fileSys, file, 1);
+
+ FSNamesystem fsn = cluster.getNamesystem();
+ NameNode namenode = cluster.getNameNode();
+ ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
+ DatanodeInfo downnode = decommissionNode(fsn, decommissionedNodes,
+ AdminStates.DECOMMISSION_INPROGRESS);
+ // Check namenode stats for multiple datanode heartbeats
+ verifyStats(namenode, fsn, downnode, true);
+
+ // Stop decommissioning and verify stats
+ writeConfigFile(excludeFile, null);
+ fsn.refreshNodes(conf);
+ DatanodeInfo ret = fsn.getDatanode(downnode);
+ waitNodeState(ret, AdminStates.NORMAL);
+ verifyStats(namenode, fsn, ret, false);
+ }
+
+ /**
+ * Test host file or include file functionality. Only datanodes
+ * in the include file are allowed to connect to the namenode.
+ */
+ @Test
+ public void testHostsFile() throws IOException, InterruptedException {
+ conf.set("dfs.hosts", hostsFile.toUri().getPath());
+ int numDatanodes = 1;
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
+ .setupHostsFile(true).build();
+ cluster.waitActive();
+
+ // Now empty hosts file and ensure the datanode is disallowed
+ // from talking to namenode, resulting in it's shutdown.
+ ArrayList<String>list = new ArrayList<String>();
+ list.add("invalidhost");
+ writeConfigFile(hostsFile, list);
+ cluster.getNamesystem().refreshNodes(conf);
+
+ DFSClient client = getDfsClient(cluster, conf);
+ DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
+ for (int i = 0 ; i < 5 && info.length != 0; i++) {
+ LOG.info("Waiting for datanode to be marked dead");
+ Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+ info = client.datanodeReport(DatanodeReportType.LIVE);
+ }
+ assertEquals("Number of live nodes should be 0", 0, info.length);
+ }
}