You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/26 03:53:21 UTC
svn commit: r1150969 [2/3] - in /hadoop/common/branches/HDFS-1073/hdfs: ./
src/c++/libhdfs/ src/contrib/hdfsproxy/
src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/java/or...
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jul 26 01:53:10 2011
@@ -31,7 +31,6 @@ import java.io.PrintWriter;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.URI;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -102,6 +101,7 @@ import org.apache.hadoop.hdfs.server.blo
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
@@ -135,11 +135,8 @@ import org.apache.hadoop.metrics2.annota
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.net.CachedDNSToSwitchMapping;
-import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -147,8 +144,6 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.HostsFileReader;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
@@ -240,7 +235,7 @@ public class FSNamesystem implements FSC
// Stores the correct file name hierarchy
//
public FSDirectory dir;
- BlockManager blockManager;
+ private BlockManager blockManager;
// Block pool ID used by this namenode
String blockPoolId;
@@ -287,7 +282,7 @@ public class FSNamesystem implements FSC
Daemon hbthread = null; // HeartbeatMonitor thread
public Daemon lmthread = null; // LeaseMonitor thread
Daemon smmthread = null; // SafeModeMonitor thread
- public Daemon replthread = null; // Replication thread
+
Daemon nnrmthread = null; // NamenodeResourceMonitor thread
private volatile boolean hasResourcesAvailable = false;
@@ -299,8 +294,6 @@ public class FSNamesystem implements FSC
// heartbeatExpireInterval is how long namenode waits for datanode to report
// heartbeat
private long heartbeatExpireInterval;
- //replicationRecheckInterval is how often namenode checks for new replication work
- private long replicationRecheckInterval;
//resourceRecheckInterval is how often namenode checks for the disk space availability
private long resourceRecheckInterval;
@@ -315,10 +308,6 @@ public class FSNamesystem implements FSC
ReplaceDatanodeOnFailure.DEFAULT;
private volatile SafeModeInfo safeMode; // safe mode information
-
- private DNSToSwitchMapping dnsToSwitchMapping;
-
- private HostsFileReader hostsReader;
private long maxFsObjects = 0; // maximum number of fs objects
@@ -377,9 +366,6 @@ public class FSNamesystem implements FSC
this.dir = new FSDirectory(fsImage, this, conf);
}
this.safeMode = new SafeModeInfo(conf);
- this.hostsReader = new HostsFileReader(
- conf.get(DFSConfigKeys.DFS_HOSTS,""),
- conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE,""));
if (isBlockTokenEnabled) {
blockTokenSecretManager = new BlockTokenSecretManager(true,
blockKeyUpdateInterval, blockTokenLifetime);
@@ -400,27 +386,13 @@ public class FSNamesystem implements FSC
blockManager.activate(conf);
this.hbthread = new Daemon(new HeartbeatMonitor());
this.lmthread = new Daemon(leaseManager.new Monitor());
- this.replthread = new Daemon(new ReplicationMonitor());
+
hbthread.start();
lmthread.start();
- replthread.start();
this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
nnrmthread.start();
- this.dnsToSwitchMapping = ReflectionUtils.newInstance(
- conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
- ScriptBasedMapping.class,
- DNSToSwitchMapping.class), conf);
-
- /* If the dns to switch mapping supports cache, resolve network
- * locations of those hosts in the include list,
- * and store the mapping in the cache; so future calls to resolve
- * will be fast.
- */
- if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
- dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
- }
registerMXBean();
DefaultMetricsSystem.instance().register(this);
}
@@ -551,9 +523,7 @@ public class FSNamesystem implements FSC
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
10 * heartbeatInterval;
- this.replicationRecheckInterval =
- conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
- DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
+
this.serverDefaults = new FsServerDefaults(
conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE),
conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BYTES_PER_CHECKSUM),
@@ -622,7 +592,6 @@ public class FSNamesystem implements FSC
try {
if (blockManager != null) blockManager.close();
if (hbthread != null) hbthread.interrupt();
- if (replthread != null) replthread.interrupt();
if (smmthread != null) smmthread.interrupt();
if (dtSecretManager != null) dtSecretManager.stopThreads();
if (nnrmthread != null) nnrmthread.interrupt();
@@ -770,7 +739,7 @@ public class FSNamesystem implements FSC
*
* @return current access keys
*/
- ExportedBlockKeys getBlockKeys() {
+ public ExportedBlockKeys getBlockKeys() {
return isBlockTokenEnabled ? blockTokenSecretManager.exportKeys()
: ExportedBlockKeys.DUMMY_KEYS;
}
@@ -1838,8 +1807,8 @@ public class FSNamesystem implements FSC
}
// choose new datanodes.
- final DatanodeInfo[] targets = blockManager.replicator.chooseTarget(
- src, numAdditionalNodes, clientnode, chosen, true,
+ final DatanodeInfo[] targets = blockManager.getBlockPlacementPolicy(
+ ).chooseTarget(src, numAdditionalNodes, clientnode, chosen, true,
excludes, preferredblocksize);
final LocatedBlock lb = new LocatedBlock(blk, targets);
if (isBlockTokenEnabled) {
@@ -2782,162 +2751,12 @@ public class FSNamesystem implements FSC
throws IOException {
writeLock();
try {
- registerDatanodeInternal(nodeReg);
+ getBlockManager().getDatanodeManager().registerDatanode(nodeReg);
+ checkSafeMode();
} finally {
writeUnlock();
}
}
-
- /** @see #registerDatanode(DatanodeRegistration) */
- public void registerDatanodeInternal(DatanodeRegistration nodeReg)
- throws IOException {
- assert hasWriteLock();
- String dnAddress = Server.getRemoteAddress();
- if (dnAddress == null) {
- // Mostly called inside an RPC.
- // But if not, use address passed by the data-node.
- dnAddress = nodeReg.getHost();
- }
-
- // check if the datanode is allowed to be connect to the namenode
- if (!verifyNodeRegistration(nodeReg, dnAddress)) {
- throw new DisallowedDatanodeException(nodeReg);
- }
-
- String hostName = nodeReg.getHost();
-
- // update the datanode's name with ip:port
- DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
- nodeReg.getStorageID(),
- nodeReg.getInfoPort(),
- nodeReg.getIpcPort());
- nodeReg.updateRegInfo(dnReg);
- nodeReg.exportedKeys = getBlockKeys();
-
- NameNode.stateChangeLog.info(
- "BLOCK* NameSystem.registerDatanode: "
- + "node registration from " + nodeReg.getName()
- + " storage " + nodeReg.getStorageID());
-
- DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
- DatanodeDescriptor nodeN =
- blockManager.getDatanodeManager().getDatanodeByHost(nodeReg.getName());
-
- if (nodeN != null && nodeN != nodeS) {
- NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
- + "node from name: " + nodeN.getName());
- // nodeN previously served a different data storage,
- // which is not served by anybody anymore.
- removeDatanode(nodeN);
- // physically remove node from datanodeMap
- blockManager.getDatanodeManager().wipeDatanode(nodeN);
- nodeN = null;
- }
-
- if (nodeS != null) {
- if (nodeN == nodeS) {
- // The same datanode has been just restarted to serve the same data
- // storage. We do not need to remove old data blocks, the delta will
- // be calculated on the next block report from the datanode
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
- + "node restarted.");
- }
- } else {
- // nodeS is found
- /* The registering datanode is a replacement node for the existing
- data storage, which from now on will be served by a new node.
- If this message repeats, both nodes might have same storageID
- by (insanely rare) random chance. User needs to restart one of the
- nodes with its data cleared (or user can just remove the StorageID
- value in "VERSION" file under the data directory of the datanode,
- but this is might not work if VERSION file format has changed
- */
- NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
- + "node " + nodeS.getName()
- + " is replaced by " + nodeReg.getName() +
- " with the same storageID " +
- nodeReg.getStorageID());
- }
- // update cluster map
- blockManager.getDatanodeManager().getNetworkTopology().remove(nodeS);
- nodeS.updateRegInfo(nodeReg);
- nodeS.setHostName(hostName);
- nodeS.setDisallowed(false); // Node is in the include list
-
- // resolve network location
- resolveNetworkLocation(nodeS);
- blockManager.getDatanodeManager().getNetworkTopology().add(nodeS);
-
- // also treat the registration message as a heartbeat
- synchronized(heartbeats) {
- if( !heartbeats.contains(nodeS)) {
- heartbeats.add(nodeS);
- //update its timestamp
- nodeS.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
- nodeS.isAlive = true;
- }
- }
- checkDecommissioning(nodeS, dnAddress);
- return;
- }
-
- // this is a new datanode serving a new data storage
- if (nodeReg.getStorageID().equals("")) {
- // this data storage has never been registered
- // it is either empty or was created by pre-storageID version of DFS
- nodeReg.storageID = newStorageID();
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.registerDatanode: "
- + "new storageID " + nodeReg.getStorageID() + " assigned.");
- }
- }
- // register new datanode
- DatanodeDescriptor nodeDescr
- = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
- resolveNetworkLocation(nodeDescr);
- blockManager.getDatanodeManager().addDatanode(nodeDescr);
- checkDecommissioning(nodeDescr, dnAddress);
-
- // also treat the registration message as a heartbeat
- synchronized(heartbeats) {
- heartbeats.add(nodeDescr);
- nodeDescr.isAlive = true;
- // no need to update its timestamp
- // because its is done when the descriptor is created
- }
-
- checkSafeMode();
- }
-
- /* Resolve a node's network location */
- private void resolveNetworkLocation (DatanodeDescriptor node) {
- assert hasWriteLock();
- List<String> names = new ArrayList<String>(1);
- if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
- // get the node's IP address
- names.add(node.getHost());
- } else {
- // get the node's host name
- String hostName = node.getHostName();
- int colon = hostName.indexOf(":");
- hostName = (colon==-1)?hostName:hostName.substring(0,colon);
- names.add(hostName);
- }
-
- // resolve its network location
- List<String> rName = dnsToSwitchMapping.resolve(names);
- String networkLocation;
- if (rName == null) {
- LOG.error("The resolve call returned null! Using " +
- NetworkTopology.DEFAULT_RACK + " for host " + names);
- networkLocation = NetworkTopology.DEFAULT_RACK;
- } else {
- networkLocation = rName.get(0);
- }
- node.setNetworkLocation(networkLocation);
- }
/**
* Get registrationID for datanodes based on the namespaceID.
@@ -2948,26 +2767,8 @@ public class FSNamesystem implements FSC
public String getRegistrationID() {
return Storage.getRegistrationID(dir.fsImage.getStorage());
}
-
- /**
- * Generate new storage ID.
- *
- * @return unique storage ID
- *
- * Note: that collisions are still possible if somebody will try
- * to bring in a data storage from a different cluster.
- */
- private String newStorageID() {
- String newID = null;
- while(newID == null) {
- newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
- if (datanodeMap.get(newID) != null)
- newID = null;
- }
- return newID;
- }
-
- private boolean isDatanodeDead(DatanodeDescriptor node) {
+
+ public boolean isDatanodeDead(DatanodeDescriptor node) {
return (node.getLastUpdate() <
(now() - heartbeatExpireInterval));
}
@@ -3080,7 +2881,7 @@ public class FSNamesystem implements FSC
return null;
}
- private void updateStats(DatanodeDescriptor node, boolean isAdded) {
+ public void updateStats(DatanodeDescriptor node, boolean isAdded) {
//
// The statistics are protected by the heartbeat lock
// For decommissioning/decommissioned nodes, only used capacity
@@ -3204,77 +3005,7 @@ public class FSNamesystem implements FSC
}
}
- /**
- * Periodically calls computeReplicationWork().
- */
- class ReplicationMonitor implements Runnable {
- static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
- static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
- public void run() {
- while (fsRunning) {
- try {
- computeDatanodeWork();
- blockManager.processPendingReplications();
- Thread.sleep(replicationRecheckInterval);
- } catch (InterruptedException ie) {
- LOG.warn("ReplicationMonitor thread received InterruptedException." + ie);
- break;
- } catch (IOException ie) {
- LOG.warn("ReplicationMonitor thread received exception. " + ie);
- } catch (Throwable t) {
- LOG.warn("ReplicationMonitor thread received Runtime exception. " + t);
- Runtime.getRuntime().exit(-1);
- }
- }
- }
- }
-
- /////////////////////////////////////////////////////////
- //
- // These methods are called by the Namenode system, to see
- // if there is any work for registered datanodes.
- //
- /////////////////////////////////////////////////////////
- /**
- * Compute block replication and block invalidation work
- * that can be scheduled on data-nodes.
- * The datanode will be informed of this work at the next heartbeat.
- *
- * @return number of blocks scheduled for replication or removal.
- * @throws IOException
- */
- public int computeDatanodeWork() throws IOException {
- int workFound = 0;
- int blocksToProcess = 0;
- int nodesToProcess = 0;
- // Blocks should not be replicated or removed if in safe mode.
- // It's OK to check safe mode here w/o holding lock, in the worst
- // case extra replications will be scheduled, and these will get
- // fixed up later.
- if (isInSafeMode())
- return workFound;
-
- synchronized (heartbeats) {
- blocksToProcess = (int)(heartbeats.size()
- * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
- nodesToProcess = (int)Math.ceil((double)heartbeats.size()
- * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
- }
-
- workFound = blockManager.computeReplicationWork(blocksToProcess);
-
- // Update FSNamesystemMetrics counters
- writeLock();
- try {
- blockManager.updateState();
- blockManager.scheduledReplicationBlocksCount = workFound;
- } finally {
- writeUnlock();
- }
- workFound += blockManager.computeInvalidateWork(nodesToProcess);
- return workFound;
- }
-
+
public void setNodeReplicationLimit(int limit) {
blockManager.maxReplicationStreams = limit;
}
@@ -3282,10 +3013,10 @@ public class FSNamesystem implements FSC
/**
* Remove a datanode descriptor.
* @param nodeID datanode ID.
- * @throws IOException
+ * @throws UnregisteredNodeException
*/
- public void removeDatanode(DatanodeID nodeID)
- throws IOException {
+ public void removeDatanode(final DatanodeID nodeID
+ ) throws UnregisteredNodeException {
writeLock();
try {
DatanodeDescriptor nodeInfo = getDatanode(nodeID);
@@ -3666,83 +3397,23 @@ public class FSNamesystem implements FSC
}
int getNumberOfDatanodes(DatanodeReportType type) {
- return getDatanodeListForReport(type).size();
- }
-
- private ArrayList<DatanodeDescriptor> getDatanodeListForReport(
- DatanodeReportType type) {
readLock();
- try {
- boolean listLiveNodes = type == DatanodeReportType.ALL ||
- type == DatanodeReportType.LIVE;
- boolean listDeadNodes = type == DatanodeReportType.ALL ||
- type == DatanodeReportType.DEAD;
-
- HashMap<String, String> mustList = new HashMap<String, String>();
-
- if (listDeadNodes) {
- //first load all the nodes listed in include and exclude files.
- Iterator<String> it = hostsReader.getHosts().iterator();
- while (it.hasNext()) {
- mustList.put(it.next(), "");
- }
- it = hostsReader.getExcludedHosts().iterator();
- while (it.hasNext()) {
- mustList.put(it.next(), "");
- }
- }
-
- ArrayList<DatanodeDescriptor> nodes = null;
-
- synchronized (datanodeMap) {
- nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() +
- mustList.size());
- Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- while (it.hasNext()) {
- DatanodeDescriptor dn = it.next();
- boolean isDead = isDatanodeDead(dn);
- if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
- nodes.add(dn);
- }
- //Remove any form of the this datanode in include/exclude lists.
- try {
- InetAddress inet = InetAddress.getByName(dn.getHost());
- // compare hostname(:port)
- mustList.remove(inet.getHostName());
- mustList.remove(inet.getHostName()+":"+dn.getPort());
- // compare ipaddress(:port)
- mustList.remove(inet.getHostAddress().toString());
- mustList.remove(inet.getHostAddress().toString()+ ":" +dn.getPort());
- } catch ( UnknownHostException e ) {
- mustList.remove(dn.getName());
- mustList.remove(dn.getHost());
- LOG.warn(e);
- }
- }
- }
-
- if (listDeadNodes) {
- Iterator<String> it = mustList.keySet().iterator();
- while (it.hasNext()) {
- DatanodeDescriptor dn =
- new DatanodeDescriptor(new DatanodeID(it.next()));
- dn.setLastUpdate(0);
- nodes.add(dn);
- }
- }
- return nodes;
+ try {
+ return getBlockManager().getDatanodeManager().getDatanodeListForReport(
+ type).size();
} finally {
readUnlock();
}
}
- public DatanodeInfo[] datanodeReport( DatanodeReportType type)
- throws AccessControlException {
+ DatanodeInfo[] datanodeReport(final DatanodeReportType type
+ ) throws AccessControlException {
+ checkSuperuserPrivilege();
readLock();
try {
- checkSuperuserPrivilege();
-
- ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
+ final DatanodeManager dm = getBlockManager().getDatanodeManager();
+ final List<DatanodeDescriptor> results = dm.getDatanodeListForReport(type);
+
DatanodeInfo[] arr = new DatanodeInfo[results.size()];
for (int i=0; i<arr.length; i++) {
arr[i] = new DatanodeInfo(results.get(i));
@@ -3806,8 +3477,8 @@ public class FSNamesystem implements FSC
ArrayList<DatanodeDescriptor> dead) {
readLock();
try {
- ArrayList<DatanodeDescriptor> results =
- getDatanodeListForReport(DatanodeReportType.ALL);
+ final List<DatanodeDescriptor> results = getBlockManager(
+ ).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.ALL);
for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if (isDatanodeDead(node))
@@ -3838,43 +3509,6 @@ public class FSNamesystem implements FSC
}
}
- /**
- * Start decommissioning the specified datanode.
- */
- private void startDecommission(DatanodeDescriptor node)
- throws IOException {
- assert hasWriteLock();
- if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
- LOG.info("Start Decommissioning node " + node.getName() + " with " +
- node.numBlocks() + " blocks.");
- synchronized (heartbeats) {
- updateStats(node, false);
- node.startDecommission();
- updateStats(node, true);
- }
- node.decommissioningStatus.setStartTime(now());
-
- // all the blocks that reside on this node have to be replicated.
- checkDecommissionStateInternal(node);
- }
- }
-
- /**
- * Stop decommissioning the specified datanodes.
- */
- public void stopDecommission(DatanodeDescriptor node)
- throws IOException {
- assert hasWriteLock();
- if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- LOG.info("Stop Decommissioning node " + node.getName());
- synchronized (heartbeats) {
- updateStats(node, false);
- node.stopDecommission();
- updateStats(node, true);
- }
- }
- }
-
public Date getStartTime() {
return new Date(systemStart);
}
@@ -3899,85 +3533,6 @@ public class FSNamesystem implements FSC
return replication;
}
- /**
- * Change, if appropriate, the admin state of a datanode to
- * decommission completed. Return true if decommission is complete.
- */
- public boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
- assert hasWriteLock();
- //
- // Check to see if all blocks in this decommissioned
- // node has reached their target replication factor.
- //
- if (node.isDecommissionInProgress()) {
- if (!blockManager.isReplicationInProgress(node)) {
- node.setDecommissioned();
- LOG.info("Decommission complete for node " + node.getName());
- }
- }
- return node.isDecommissioned();
- }
-
- /**
- * Keeps track of which datanodes/ipaddress are allowed to connect to the namenode.
- */
- private boolean inHostsList(DatanodeID node, String ipAddr) {
- Set<String> hostsList = hostsReader.getHosts();
- return checkInList(node, ipAddr, hostsList, false);
- }
-
- private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
- Set<String> excludeList = hostsReader.getExcludedHosts();
- return checkInList(node, ipAddr, excludeList, true);
- }
-
-
- /**
- * Check if the given node (of DatanodeID or ipAddress) is in the (include or
- * exclude) list. If ipAddress in null, check only based upon the given
- * DatanodeID. If ipAddress is not null, the ipAddress should refers to the
- * same host that given DatanodeID refers to.
- *
- * @param node, DatanodeID, the host DatanodeID
- * @param ipAddress, if not null, should refers to the same host
- * that DatanodeID refers to
- * @param hostsList, the list of hosts in the include/exclude file
- * @param isExcludeList, boolean, true if this is the exclude list
- * @return boolean, if in the list
- */
- private boolean checkInList(DatanodeID node, String ipAddress,
- Set<String> hostsList, boolean isExcludeList) {
- InetAddress iaddr = null;
- try {
- if (ipAddress != null) {
- iaddr = InetAddress.getByName(ipAddress);
- } else {
- iaddr = InetAddress.getByName(node.getHost());
- }
- }catch (UnknownHostException e) {
- LOG.warn("Unknown host in host list: "+ipAddress);
- // can't resolve the host name.
- if (isExcludeList){
- return true;
- } else {
- return false;
- }
- }
-
- // if include list is empty, host is in include list
- if ( (!isExcludeList) && (hostsList.isEmpty()) ){
- return true;
- }
- return // compare ipaddress(:port)
- (hostsList.contains(iaddr.getHostAddress().toString()))
- || (hostsList.contains(iaddr.getHostAddress().toString() + ":"
- + node.getPort()))
- // compare hostname(:port)
- || (hostsList.contains(iaddr.getHostName()))
- || (hostsList.contains(iaddr.getHostName() + ":" + node.getPort()))
- || ((node instanceof DatanodeInfo) && hostsList
- .contains(((DatanodeInfo) node).getHostName()));
- }
/**
* Rereads the config to get hosts and exclude list file names.
@@ -3990,29 +3545,10 @@ public class FSNamesystem implements FSC
*/
public void refreshNodes(Configuration conf) throws IOException {
checkSuperuserPrivilege();
- // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
- // Update the file names and refresh internal includes and excludes list
- if (conf == null)
- conf = new HdfsConfiguration();
- hostsReader.updateFileNames(conf.get(DFSConfigKeys.DFS_HOSTS,""),
- conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
- hostsReader.refresh();
+ getBlockManager().getDatanodeManager().refreshHostsReader(conf);
writeLock();
try {
- for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext();) {
- DatanodeDescriptor node = it.next();
- // Check if not include.
- if (!inHostsList(node, null)) {
- node.setDisallowed(true); // case 2.
- } else {
- if (inExcludedHostsList(node, null)) {
- startDecommission(node); // case 3.
- } else {
- stopDecommission(node); // case 4.
- }
- }
- }
+ getBlockManager().getDatanodeManager().refreshDatanodes();
} finally {
writeUnlock();
}
@@ -4022,27 +3558,7 @@ public class FSNamesystem implements FSC
checkSuperuserPrivilege();
getFSImage().finalizeUpgrade();
}
-
- /**
- * Checks if the node is not on the hosts list. If it is not, then
- * it will be disallowed from registering.
- */
- private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) {
- assert hasWriteLock();
- return inHostsList(nodeReg, ipAddr);
- }
- /**
- * Decommission the node if it is in exclude list.
- */
- private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr)
- throws IOException {
- assert hasWriteLock();
- // If the registered node is in exclude list, then decommission it
- if (inExcludedHostsList(nodeReg, ipAddr)) {
- startDecommission(nodeReg);
- }
- }
/**
* Get data node by storage ID.
@@ -4051,7 +3567,8 @@ public class FSNamesystem implements FSC
* @return DatanodeDescriptor or null if the node is not found.
* @throws IOException
*/
- public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException {
+ public DatanodeDescriptor getDatanode(DatanodeID nodeID
+ ) throws UnregisteredNodeException {
assert hasReadOrWriteLock();
UnregisteredNodeException e = null;
DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
@@ -5395,8 +4912,8 @@ public class FSNamesystem implements FSC
try {
ArrayList<DatanodeDescriptor> decommissioningNodes =
new ArrayList<DatanodeDescriptor>();
- ArrayList<DatanodeDescriptor> results =
- getDatanodeListForReport(DatanodeReportType.LIVE);
+ final List<DatanodeDescriptor> results = getBlockManager(
+ ).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.LIVE);
for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if (node.isDecommissionInProgress()) {
@@ -5808,50 +5325,9 @@ public class FSNamesystem implements FSC
public BlockManager getBlockManager() {
return blockManager;
}
-
- /**
- * Remove an already decommissioned data node who is neither in include nor
- * exclude hosts lists from the the list of live or dead nodes. This is used
- * to not display an already decommssioned data node to the operators.
- * The operation procedure of making a already decommissioned data node not
- * to be displayed is as following:
- * <ol>
- * <li>
- * Host must have been in the include hosts list and the include hosts list
- * must not be empty.
- * </li>
- * <li>
- * Host is decommissioned by remaining in the include hosts list and added
- * into the exclude hosts list. Name node is updated with the new
- * information by issuing dfsadmin -refreshNodes command.
- * </li>
- * <li>
- * Host is removed from both include hosts and exclude hosts lists. Name
- * node is updated with the new informationby issuing dfsamin -refreshNodes
- * command.
- * <li>
- * </ol>
- *
- * @param nodeList
- * , array list of live or dead nodes.
- */
- void removeDecomNodeFromList(ArrayList<DatanodeDescriptor> nodeList) {
- // If the include list is empty, any nodes are welcomed and it does not
- // make sense to exclude any nodes from the cluster. Therefore, no remove.
- if (hostsReader.getHosts().isEmpty()) {
- return;
- }
-
- for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- if ((!inHostsList(node, null)) && (!inExcludedHostsList(node, null))
- && node.isDecommissioned()) {
- // Include list is not empty, an existing datanode does not appear
- // in both include or exclude lists and it has been decommissioned.
- // Remove it from the node list.
- it.remove();
- }
- }
+
+ void removeDecomNodeFromList(List<DatanodeDescriptor> nodeList) {
+ getBlockManager().getDatanodeManager().removeDecomNodeFromList(nodeList);
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Tue Jul 26 01:53:10 2011
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.HdfsConfig
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
-import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
import org.apache.hadoop.net.NetUtils;
@@ -57,10 +56,10 @@ public class FileChecksumServlets {
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws ServletException, IOException {
final ServletContext context = getServletContext();
- final Configuration conf =
- (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+ final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
final UserGroupInformation ugi = getUGI(request, conf);
- final NameNode namenode = (NameNode)context.getAttribute("name.node");
+ final NameNode namenode = NameNodeHttpServer.getNameNodeFromContext(
+ context);
final DatanodeID datanode = NamenodeJspHelper.getRandomDatanode(namenode);
try {
final URI uri = createRedirectUri("/getFileChecksum", ugi, datanode,
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Tue Jul 26 01:53:10 2011
@@ -65,7 +65,8 @@ public class FileDataServlet extends Dfs
}
// Add namenode address to the url params
- NameNode nn = (NameNode)getServletContext().getAttribute("name.node");
+ NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
+ getServletContext());
String addr = NameNode.getHostPortString(nn.getNameNodeAddress());
String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
@@ -85,7 +86,8 @@ public class FileDataServlet extends Dfs
throws IOException {
if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) {
// pick a random datanode
- NameNode nn = (NameNode)getServletContext().getAttribute("name.node");
+ NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
+ getServletContext());
return NamenodeJspHelper.getRandomDatanode(nn);
}
return JspHelper.bestNode(blks);
@@ -101,8 +103,8 @@ public class FileDataServlet extends Dfs
public void doGet(final HttpServletRequest request,
final HttpServletResponse response)
throws IOException {
- final Configuration conf =
- (Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
+ final Configuration conf = NameNodeHttpServer.getConfFromContext(
+ getServletContext());
final UserGroupInformation ugi = getUGI(request, conf);
try {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java Tue Jul 26 01:53:10 2011
@@ -30,7 +30,6 @@ import javax.servlet.http.HttpServletRes
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.security.UserGroupInformation;
/**
@@ -49,17 +48,15 @@ public class FsckServlet extends DfsServ
final PrintWriter out = response.getWriter();
final InetAddress remoteAddress =
InetAddress.getByName(request.getRemoteAddr());
- final Configuration conf =
- (Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
+ final ServletContext context = getServletContext();
+ final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
final UserGroupInformation ugi = getUGI(request, conf);
try {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- final ServletContext context = getServletContext();
-
- NameNode nn = (NameNode) context.getAttribute("name.node");
+ NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
final FSNamesystem namesystem = nn.getNamesystem();
final int totalDatanodes =
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java Tue Jul 26 01:53:10 2011
@@ -29,7 +29,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -49,8 +48,7 @@ public class GetDelegationTokenServlet e
throws ServletException, IOException {
final UserGroupInformation ugi;
final ServletContext context = getServletContext();
- final Configuration conf =
- (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+ final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
try {
ugi = getUGI(req, conf);
} catch(IOException ioe) {
@@ -61,7 +59,7 @@ public class GetDelegationTokenServlet e
return;
}
LOG.info("Sending token: {" + ugi.getUserName() + "," + req.getRemoteAddr() +"}");
- final NameNode nn = (NameNode) context.getAttribute("name.node");
+ final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
String renewer = req.getParameter(RENEWER);
final String renewerFinal = (renewer == null) ?
req.getUserPrincipal().getName() : renewer;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java Tue Jul 26 01:53:10 2011
@@ -70,7 +70,7 @@ public class GetImageServlet extends Htt
) throws ServletException, IOException {
try {
ServletContext context = getServletContext();
- final FSImage nnImage = (FSImage)context.getAttribute("name.system.image");
+ final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
final GetImageParams parsedParams = new GetImageParams(request, response);
final Configuration conf =
(Configuration)getServletContext().getAttribute(JspHelper.CURRENT_CONF);
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Tue Jul 26 01:53:10 2011
@@ -127,7 +127,7 @@ public class INodeFile extends INode {
size += in.blocks.length;
}
- for(BlockInfo bi: this.blocks) {
+ for(BlockInfo bi: newlist) {
bi.setINode(this);
}
this.blocks = newlist;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Jul 26 01:53:10 2011
@@ -21,7 +21,6 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
-import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -67,7 +66,6 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
-import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -82,7 +80,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
@@ -96,7 +93,6 @@ import org.apache.hadoop.security.Groups
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
@@ -209,8 +205,6 @@ public class NameNode implements Namenod
public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
public static final Log stateChangeLog = LogFactory.getLog("org.apache.hadoop.hdfs.StateChange");
- public static final String NAMENODE_ADDRESS_ATTRIBUTE_KEY = "name.node.address";
-
protected FSNamesystem namesystem;
protected NamenodeRole role;
/** RPC server. Package-protected for use in tests. */
@@ -226,9 +220,7 @@ public class NameNode implements Namenod
/** RPC server for DN address */
protected InetSocketAddress serviceRPCAddress = null;
/** httpServer */
- protected HttpServer httpServer;
- /** HTTP server address */
- protected InetSocketAddress httpAddress = null;
+ protected NameNodeHttpServer httpServer;
private Thread emptier;
/** only used for testing purposes */
protected boolean stopRequested = false;
@@ -373,9 +365,10 @@ public class NameNode implements Namenod
return NetUtils.createSocketAddr(
conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50070"));
}
-
- protected void setHttpServerAddress(Configuration conf){
- conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, getHostPortString(httpAddress));
+
+ protected void setHttpServerAddress(Configuration conf) {
+ conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
+ getHostPortString(getHttpAddress()));
}
protected void loadNamesystem(Configuration conf) throws IOException {
@@ -389,12 +382,21 @@ public class NameNode implements Namenod
NamenodeRegistration setRegistration() {
nodeRegistration = new NamenodeRegistration(
getHostPortString(rpcAddress),
- getHostPortString(httpAddress),
+ getHostPortString(getHttpAddress()),
getFSImage().getStorage(), getRole());
return nodeRegistration;
}
/**
+ * Login as the configured user for the NameNode.
+ */
+ void loginAsNameNodeUser(Configuration conf) throws IOException {
+ InetSocketAddress socAddr = getRpcServerAddress(conf);
+ SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
+ DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
+ }
+
+ /**
* Initialize name-node.
*
* @param conf the configuration
@@ -402,8 +404,7 @@ public class NameNode implements Namenod
protected void initialize(Configuration conf) throws IOException {
InetSocketAddress socAddr = getRpcServerAddress(conf);
UserGroupInformation.setConfiguration(conf);
- SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
- DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
+ loginAsNameNodeUser(conf);
int handlerCount =
conf.getInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT);
@@ -515,108 +516,9 @@ public class NameNode implements Namenod
}
private void startHttpServer(final Configuration conf) throws IOException {
- final InetSocketAddress infoSocAddr = getHttpServerAddress(conf);
- final String infoHost = infoSocAddr.getHostName();
- if(UserGroupInformation.isSecurityEnabled()) {
- String httpsUser = SecurityUtil.getServerPrincipal(conf
- .get(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY), infoHost);
- if (httpsUser == null) {
- LOG.warn(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY
- + " not defined in config. Starting http server as "
- + SecurityUtil.getServerPrincipal(conf
- .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), rpcAddress
- .getHostName())
- + ": Kerberized SSL may be not function correctly.");
- } else {
- // Kerberized SSL servers must be run from the host principal...
- LOG.info("Logging in as " + httpsUser + " to start http server.");
- SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
- DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY, infoHost);
- }
- }
- UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- try {
- this.httpServer = ugi.doAs(new PrivilegedExceptionAction<HttpServer>() {
- @Override
- public HttpServer run() throws IOException, InterruptedException {
- int infoPort = infoSocAddr.getPort();
- httpServer = new HttpServer("hdfs", infoHost, infoPort,
- infoPort == 0, conf,
- new AccessControlList(conf.get(DFSConfigKeys.DFS_ADMIN, " ")));
-
- boolean certSSL = conf.getBoolean("dfs.https.enable", false);
- boolean useKrb = UserGroupInformation.isSecurityEnabled();
- if (certSSL || useKrb) {
- boolean needClientAuth = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
- DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
- InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf
- .get(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
- DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT));
- Configuration sslConf = new HdfsConfiguration(false);
- if (certSSL) {
- sslConf.addResource(conf.get(
- "dfs.https.server.keystore.resource", "ssl-server.xml"));
- }
- httpServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth,
- useKrb);
- // assume same ssl port for all datanodes
- InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf
- .get("dfs.datanode.https.address", infoHost + ":" + 50475));
- httpServer.setAttribute("datanode.https.port", datanodeSslPort
- .getPort());
- }
- httpServer.setAttribute("name.node", NameNode.this);
- httpServer.setAttribute(NAMENODE_ADDRESS_ATTRIBUTE_KEY,
- getNameNodeAddress());
- httpServer.setAttribute("name.system.image", getFSImage());
- httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
- httpServer.addInternalServlet("getDelegationToken",
- GetDelegationTokenServlet.PATH_SPEC,
- GetDelegationTokenServlet.class, true);
- httpServer.addInternalServlet("renewDelegationToken",
- RenewDelegationTokenServlet.PATH_SPEC,
- RenewDelegationTokenServlet.class, true);
- httpServer.addInternalServlet("cancelDelegationToken",
- CancelDelegationTokenServlet.PATH_SPEC,
- CancelDelegationTokenServlet.class, true);
- httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
- true);
- httpServer.addInternalServlet("getimage", "/getimage",
- GetImageServlet.class, true);
- httpServer.addInternalServlet("listPaths", "/listPaths/*",
- ListPathsServlet.class, false);
- httpServer.addInternalServlet("data", "/data/*",
- FileDataServlet.class, false);
- httpServer.addInternalServlet("checksum", "/fileChecksum/*",
- FileChecksumServlets.RedirectServlet.class, false);
- httpServer.addInternalServlet("contentSummary", "/contentSummary/*",
- ContentSummaryServlet.class, false);
- httpServer.start();
-
- // The web-server port can be ephemeral... ensure we have the correct
- // info
- infoPort = httpServer.getPort();
- httpAddress = new InetSocketAddress(infoHost, infoPort);
- setHttpServerAddress(conf);
- LOG.info(getRole() + " Web-server up at: " + httpAddress);
- return httpServer;
- }
- });
- } catch (InterruptedException e) {
- throw new IOException(e);
- } finally {
- if(UserGroupInformation.isSecurityEnabled() &&
- conf.get(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY) != null) {
- // Go back to being the correct Namenode principal
- LOG.info("Logging back in as "
- + SecurityUtil.getServerPrincipal(conf
- .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), rpcAddress
- .getHostName()) + " following http server start.");
- SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
- DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, rpcAddress.getHostName());
- }
- }
+ httpServer = new NameNodeHttpServer(conf, this, getHttpServerAddress(conf));
+ httpServer.start();
+ setHttpServerAddress(conf);
}
/**
@@ -1390,7 +1292,7 @@ public class NameNode implements Namenod
* @return the http address.
*/
public InetSocketAddress getHttpAddress() {
- return httpAddress;
+ return httpServer.getHttpAddress();
}
/**
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Tue Jul 26 01:53:10 2011
@@ -385,7 +385,7 @@ class NamenodeJspHelper {
static void redirectToRandomDataNode(ServletContext context,
HttpServletRequest request, HttpServletResponse resp) throws IOException,
InterruptedException {
- final NameNode nn = (NameNode) context.getAttribute("name.node");
+ final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
final Configuration conf = (Configuration) context
.getAttribute(JspHelper.CURRENT_CONF);
final DatanodeID datanode = getRandomDatanode(nn);
@@ -566,12 +566,12 @@ class NamenodeJspHelper {
HttpServletRequest request) throws IOException {
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
- final NameNode nn = (NameNode)context.getAttribute("name.node");
+ final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
nn.getNamesystem().DFSNodesStatus(live, dead);
nn.getNamesystem().removeDecomNodeFromList(live);
nn.getNamesystem().removeDecomNodeFromList(dead);
InetSocketAddress nnSocketAddress = (InetSocketAddress) context
- .getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
+ .getAttribute(NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
String nnaddr = nnSocketAddress.getAddress().getHostAddress() + ":"
+ nnSocketAddress.getPort();
@@ -724,7 +724,7 @@ class NamenodeJspHelper {
this.inode = null;
} else {
this.block = new Block(blockId);
- this.inode = fsn.blockManager.getINode(block);
+ this.inode = fsn.getBlockManager().getINode(block);
}
}
@@ -799,9 +799,9 @@ class NamenodeJspHelper {
doc.startTag("replicas");
- if (fsn.blockManager.blocksMap.contains(block)) {
+ if (fsn.getBlockManager().blocksMap.contains(block)) {
Iterator<DatanodeDescriptor> it =
- fsn.blockManager.blocksMap.nodeIterator(block);
+ fsn.getBlockManager().blocksMap.nodeIterator(block);
while (it.hasNext()) {
doc.startTag("replica");
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java Tue Jul 26 01:53:10 2011
@@ -47,8 +47,7 @@ public class RenewDelegationTokenServlet
throws ServletException, IOException {
final UserGroupInformation ugi;
final ServletContext context = getServletContext();
- final Configuration conf =
- (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+ final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
try {
ugi = getUGI(req, conf);
} catch(IOException ioe) {
@@ -58,7 +57,7 @@ public class RenewDelegationTokenServlet
"Unable to identify or authenticate user");
return;
}
- final NameNode nn = (NameNode) context.getAttribute("name.node");
+ final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
String tokenString = req.getParameter(TOKEN);
if (tokenString == null) {
resp.sendError(HttpServletResponse.SC_MULTIPLE_CHOICES,
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Tue Jul 26 01:53:10 2011
@@ -175,6 +175,7 @@ public class SecondaryNameNode implement
initialize(conf, commandLineOpts);
} catch(IOException e) {
shutdown();
+ LOG.fatal("Failed to start secondary namenode. ", e);
throw e;
}
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/packages/rpm/spec/hadoop-hdfs.spec
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/packages/rpm/spec/hadoop-hdfs.spec?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/packages/rpm/spec/hadoop-hdfs.spec (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/packages/rpm/spec/hadoop-hdfs.spec Tue Jul 26 01:53:10 2011
@@ -17,9 +17,10 @@
# RPM Spec file for Hadoop version @version@
#
-%define name hadoop-hdfs
-%define version @version@
-%define release @package.release@
+%define name hadoop-hdfs
+%define version @version@
+%define release @package.release@
+%define major_version %(echo %{version} | cut -d. -f -2)
# Installation Locations
%define _prefix @package.prefix@
@@ -75,7 +76,7 @@ Prefix: %{_conf_dir}
Prefix: %{_log_dir}
Prefix: %{_pid_dir}
Buildroot: %{_build_dir}
-Requires: sh-utils, textutils, /usr/sbin/useradd, /usr/sbin/usermod, /sbin/chkconfig, /sbin/service, jdk >= 1.6, hadoop-common >= %{version}
+Requires: sh-utils, textutils, /usr/sbin/useradd, /usr/sbin/usermod, /sbin/chkconfig, /sbin/service, hadoop-common >= %{major_version}.0, hadoop-common <= %{major_version}.9999
AutoReqProv: no
Provides: hadoop-hdfs
Propchange: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 01:53:10 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs/src/test/hdfs:1134994-1148523
+/hadoop/common/trunk/hdfs/src/test/hdfs:1134994-1150966
/hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs:713112
/hadoop/core/trunk/src/test/hdfs:776175-785643
/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs:987665-1095512
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Tue Jul 26 01:53:10 2011
@@ -59,13 +59,13 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
@@ -282,7 +282,8 @@ public class DFSTestUtil {
do {
Thread.sleep(1000);
- int []r = NameNodeAdapter.getReplicaInfo(cluster.getNameNode(), b.getLocalBlock());
+ int[] r = BlockManagerTestUtil.getReplicaInfo(cluster.getNamesystem(),
+ b.getLocalBlock());
curRacks = r[0];
curReplicas = r[1];
curNeededReplicas = r[2];
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java Tue Jul 26 01:53:10 2011
@@ -17,25 +17,30 @@
*/
package org.apache.hadoop.hdfs;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
import java.net.URL;
-import org.apache.hadoop.hdfs.ByteRangeInputStream;
import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener;
-
import org.junit.Test;
-import static org.junit.Assert.*;
class MockHttpURLConnection extends HttpURLConnection {
- MockURL m;
-
- public MockHttpURLConnection(URL u, MockURL m) {
- super(u);
- this.m = m;
+ private int responseCode = -1;
+ URL m;
+
+ public MockHttpURLConnection(URL u) {
+ super(u);
+ m = u;
}
public boolean usingProxy(){
@@ -46,7 +51,6 @@ class MockHttpURLConnection extends Http
}
public void connect() throws IOException {
- m.setMsg("Connect: "+url+", Range: "+getRequestProperty("Range"));
}
public InputStream getInputStream() throws IOException {
@@ -64,8 +68,8 @@ class MockHttpURLConnection extends Http
}
public int getResponseCode() {
- if (m.responseCode != -1) {
- return m.responseCode;
+ if (responseCode != -1) {
+ return responseCode;
} else {
if (getRequestProperty("Range") == null) {
return 200;
@@ -74,89 +78,67 @@ class MockHttpURLConnection extends Http
}
}
}
-
-}
-class MockURL extends URLOpener {
- String msg;
- public int responseCode = -1;
-
- public MockURL(URL u) {
- super(u);
+ public void setResponseCode(int resCode) {
+ responseCode = resCode;
}
- public MockURL(String s) throws MalformedURLException {
- this(new URL(s));
- }
-
- public HttpURLConnection openConnection() throws IOException {
- return new MockHttpURLConnection(url, this);
- }
-
- public void setMsg(String s) {
- msg = s;
- }
-
- public String getMsg() {
- return msg;
- }
}
public class TestByteRangeInputStream {
@Test
- public void testByteRange() throws IOException, InterruptedException {
- MockURL o = new MockURL("http://test/");
- MockURL r = new MockURL((URL)null);
- ByteRangeInputStream is = new ByteRangeInputStream(o, r);
+ public void testByteRange() throws IOException {
+ URLOpener ospy = spy(new URLOpener(new URL("http://test/")));
+ doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
+ .openConnection();
+ URLOpener rspy = spy(new URLOpener((URL) null));
+ doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
+ .openConnection();
+ ByteRangeInputStream is = new ByteRangeInputStream(ospy, rspy);
assertEquals("getPos wrong", 0, is.getPos());
is.read();
- assertEquals("Initial call made incorrectly",
- "Connect: http://test/, Range: null",
- o.getMsg());
+ assertNull("Initial call made incorrectly (Range Check)", ospy
+ .openConnection().getRequestProperty("Range"));
assertEquals("getPos should be 1 after reading one byte", 1, is.getPos());
- o.setMsg(null);
-
is.read();
assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos());
- assertNull("No additional connections should have been made (no seek)",
- o.getMsg());
+ // No additional connections should have been made (no seek)
+
+ rspy.setURL(new URL("http://resolvedurl/"));
- r.setMsg(null);
- r.setURL(new URL("http://resolvedurl/"));
-
is.seek(100);
is.read();
- assertEquals("Seek to 100 bytes made incorrectly",
- "Connect: http://resolvedurl/, Range: bytes=100-",
- r.getMsg());
+ assertEquals("Seek to 100 bytes made incorrectly (Range Check)",
+ "bytes=100-", rspy.openConnection().getRequestProperty("Range"));
- assertEquals("getPos should be 101 after reading one byte", 101, is.getPos());
+ assertEquals("getPos should be 101 after reading one byte", 101,
+ is.getPos());
- r.setMsg(null);
+ verify(rspy, times(2)).openConnection();
is.seek(101);
is.read();
- assertNull("Seek to 101 should not result in another request", r.getMsg());
+ verify(rspy, times(2)).openConnection();
+
+ // Seek to 101 should not result in another request"
- r.setMsg(null);
is.seek(2500);
is.read();
- assertEquals("Seek to 2500 bytes made incorrectly",
- "Connect: http://resolvedurl/, Range: bytes=2500-",
- r.getMsg());
+ assertEquals("Seek to 2500 bytes made incorrectly (Range Check)",
+ "bytes=2500-", rspy.openConnection().getRequestProperty("Range"));
- r.responseCode = 200;
+ ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(200);
is.seek(500);
try {
@@ -168,7 +150,7 @@ public class TestByteRangeInputStream {
"HTTP_PARTIAL expected, received 200", e.getMessage());
}
- r.responseCode = 206;
+ ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206);
is.seek(0);
try {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java Tue Jul 26 01:53:10 2011
@@ -51,6 +51,8 @@ public class TestDecommission {
static final int blockSize = 8192;
static final int fileSize = 16384;
static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
+ static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec
+ static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval
Random myrand = new Random();
Path hostsFile;
@@ -74,7 +76,10 @@ public class TestDecommission {
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+ conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL);
+
writeConfigFile(excludeFile, null);
}
@@ -118,49 +123,67 @@ public class TestDecommission {
stm.close();
LOG.info("Created file " + name + " with " + repl + " replicas.");
}
-
+
/**
- * For blocks that reside on the nodes that are down, verify that their
- * replication factor is 1 more than the specified one.
+ * Verify that the number of replicas are as expected for each block in
+ * the given file.
+ * For blocks with a decommissioned node, verify that their replication
+ * is 1 more than what is specified.
+ * For blocks without decommissioned nodes, verify their replication is
+ * equal to what is specified.
+ *
+ * @param downnode - if null, there is no decommissioned node for this file.
+ * @return - null if no failure found, else an error message string.
*/
- private void checkFile(FileSystem fileSys, Path name, int repl,
- String downnode, int numDatanodes) throws IOException {
- //
- // sleep an additional 10 seconds for the blockreports from the datanodes
- // to arrive.
- //
+ private String checkFile(FileSystem fileSys, Path name, int repl,
+ String downnode, int numDatanodes) throws IOException {
+ boolean isNodeDown = (downnode != null);
// need a raw stream
- assertTrue("Not HDFS:"+fileSys.getUri(), fileSys instanceof DistributedFileSystem);
-
- DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream)
+ assertTrue("Not HDFS:"+fileSys.getUri(),
+ fileSys instanceof DistributedFileSystem);
+ DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream)
((DistributedFileSystem)fileSys).open(name);
Collection<LocatedBlock> dinfo = dis.getAllBlocks();
-
for (LocatedBlock blk : dinfo) { // for each block
int hasdown = 0;
- int firstDecomNodeIndex = -1;
DatanodeInfo[] nodes = blk.getLocations();
- for (int j = 0; j < nodes.length; j++) { // for each replica
- if (nodes[j].getName().equals(downnode)) {
+ for (int j = 0; j < nodes.length; j++) { // for each replica
+ if (isNodeDown && nodes[j].getName().equals(downnode)) {
hasdown++;
- LOG.info("Block " + blk.getBlock() + " replica " + nodes[j].getName()
- + " is decommissioned.");
- }
- if (nodes[j].isDecommissioned()) {
- if (firstDecomNodeIndex == -1) {
- firstDecomNodeIndex = j;
+ //Downnode must actually be decommissioned
+ if (!nodes[j].isDecommissioned()) {
+ return "For block " + blk.getBlock() + " replica on " +
+ nodes[j].getName() + " is given as downnode, " +
+ "but is not decommissioned";
+ }
+ //Decommissioned node (if any) should only be last node in list.
+ if (j != nodes.length - 1) {
+ return "For block " + blk.getBlock() + " decommissioned node "
+ + nodes[j].getName() + " was not last node in list: "
+ + (j + 1) + " of " + nodes.length;
+ }
+ LOG.info("Block " + blk.getBlock() + " replica on " +
+ nodes[j].getName() + " is decommissioned.");
+ } else {
+ //Non-downnodes must not be decommissioned
+ if (nodes[j].isDecommissioned()) {
+ return "For block " + blk.getBlock() + " replica on " +
+ nodes[j].getName() + " is unexpectedly decommissioned";
}
- continue;
}
- assertEquals("Decom node is not at the end", firstDecomNodeIndex, -1);
}
+
LOG.info("Block " + blk.getBlock() + " has " + hasdown
- + " decommissioned replica.");
- assertEquals("Number of replicas for block " + blk.getBlock(),
- Math.min(numDatanodes, repl+hasdown), nodes.length);
+ + " decommissioned replica.");
+ if(Math.min(numDatanodes, repl+hasdown) != nodes.length) {
+ return "Wrong number of replicas for block " + blk.getBlock() +
+ ": " + nodes.length + ", expected " +
+ Math.min(numDatanodes, repl+hasdown);
+ }
}
+ return null;
}
-
+
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
assertTrue(fileSys.exists(name));
fileSys.delete(name, true);
@@ -208,6 +231,15 @@ public class TestDecommission {
return ret;
}
+ /* stop decommission of the datanode and wait for each to reach the NORMAL state */
+ private void recomissionNode(DatanodeInfo decommissionedNode) throws IOException {
+ LOG.info("Recommissioning node: " + decommissionedNode.getName());
+ writeConfigFile(excludeFile, null);
+ cluster.getNamesystem().refreshNodes(conf);
+ waitNodeState(decommissionedNode, AdminStates.NORMAL);
+
+ }
+
/*
* Wait till node is fully decommissioned.
*/
@@ -287,6 +319,14 @@ public class TestDecommission {
}
/**
+ * Tests recommission for non federated cluster
+ */
+ @Test
+ public void testRecommission() throws IOException {
+ testRecommission(1, 6);
+ }
+
+ /**
* Test decommission for federeated cluster
*/
@Test
@@ -323,15 +363,68 @@ public class TestDecommission {
DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
- checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes);
+ assertNull(checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes));
cleanupFile(fileSys, file1);
}
}
-
- // Restart the cluster and ensure decommissioned datanodes
+
+ // Restart the cluster and ensure recommissioned datanodes
// are allowed to register with the namenode
cluster.shutdown();
startCluster(numNamenodes, numDatanodes, conf);
+ cluster.shutdown();
+ }
+
+
+ private void testRecommission(int numNamenodes, int numDatanodes)
+ throws IOException {
+ LOG.info("Starting test testRecommission");
+
+ startCluster(numNamenodes, numDatanodes, conf);
+
+ ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList =
+ new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes);
+ for(int i = 0; i < numNamenodes; i++) {
+ namenodeDecomList.add(i, new ArrayList<DatanodeInfo>(numDatanodes));
+ }
+ Path file1 = new Path("testDecommission.dat");
+ int replicas = numDatanodes - 1;
+
+ for (int i = 0; i < numNamenodes; i++) {
+ ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i);
+ FileSystem fileSys = cluster.getFileSystem(i);
+ writeFile(fileSys, file1, replicas);
+
+ // Decommission one node. Verify that node is decommissioned.
+ DatanodeInfo decomNode = decommissionNode(i, decommissionedNodes,
+ AdminStates.DECOMMISSIONED);
+ decommissionedNodes.add(decomNode);
+
+ // Ensure decommissioned datanode is not automatically shutdown
+ DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
+ assertEquals("All datanodes must be alive", numDatanodes,
+ client.datanodeReport(DatanodeReportType.LIVE).length);
+ assertNull(checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes));
+
+ // stop decommission and check if the new replicas are removed
+ recomissionNode(decomNode);
+ // wait for the block to be deleted
+ int tries = 0;
+ while (tries++ < 20) {
+ try {
+ Thread.sleep(1000);
+ if (checkFile(fileSys, file1, replicas, null, numDatanodes) == null) {
+ break;
+ }
+ } catch (InterruptedException ie) {
+ }
+ }
+ cleanupFile(fileSys, file1);
+ assertTrue("Checked if node was recommissioned " + tries + " times.",
+ tries < 20);
+ LOG.info("tried: " + tries + " times before recommissioned");
+ }
+ cluster.shutdown();
}
/**
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Tue Jul 26 01:53:10 2011
@@ -68,7 +68,7 @@ public class TestReplicationPolicy exten
throw (RuntimeException)new RuntimeException().initCause(e);
}
final BlockManager bm = namenode.getNamesystem().getBlockManager();
- replicator = bm.replicator;
+ replicator = bm.getBlockPlacementPolicy();
cluster = bm.getDatanodeManager().getNetworkTopology();
// construct network topology
for(int i=0; i<NUM_OF_DATANODES; i++) {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java Tue Jul 26 01:53:10 2011
@@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -93,7 +93,7 @@ public class TestJspHelper {
//Set the nnaddr url parameter to null.
when(request.getParameter(JspHelper.NAMENODE_ADDRESS)).thenReturn(null);
InetSocketAddress addr = new InetSocketAddress("localhost", 2222);
- when(context.getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY))
+ when(context.getAttribute(NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY))
.thenReturn(addr);
verifyServiceInToken(context, request, addr.getAddress().getHostAddress()
+ ":2222");
@@ -102,7 +102,7 @@ public class TestJspHelper {
token.setService(new Text("3.3.3.3:3333"));
tokenString = token.encodeToUrlString();
//Set the name.node.address attribute in Servlet context to null
- when(context.getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY))
+ when(context.getAttribute(NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY))
.thenReturn(null);
when(request.getParameter(JspHelper.DELEGATION_PARAMETER_NAME)).thenReturn(
tokenString);
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Tue Jul 26 01:53:10 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -220,7 +221,8 @@ public class TestBlockReport {
cluster.getNameNode().blockReport(dnR, poolId,
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
- cluster.getNamesystem().computeDatanodeWork();
+ BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
+ .getBlockManager());
printStats();
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Tue Jul 26 01:53:10 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -191,7 +192,7 @@ public class TestDataNodeVolumeFailure {
// now check the number of under-replicated blocks
FSNamesystem fsn = cluster.getNamesystem();
// force update of all the metric counts by calling computeDatanodeWork
- fsn.computeDatanodeWork();
+ BlockManagerTestUtil.getComputedDatanodeWork(fsn.getBlockManager());
// get all the counts
long underRepl = fsn.getUnderReplicatedBlocks();
long pendRepl = fsn.getPendingReplicationBlocks();
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java Tue Jul 26 01:53:10 2011
@@ -24,6 +24,7 @@ import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Test;
import org.mockito.Mockito;
@@ -63,5 +64,33 @@ public class TestRoundRobinVolumesPolicy
// Passed.
}
}
+
+ // ChooseVolume should throw DiskOutOfSpaceException with volume and block sizes in exception message.
+ @Test
+ public void testRRPolicyExceptionMessage()
+ throws Exception {
+ final List<FSVolume> volumes = new ArrayList<FSVolume>();
+
+ // First volume, with 500 bytes of space.
+ volumes.add(Mockito.mock(FSVolume.class));
+ Mockito.when(volumes.get(0).getAvailable()).thenReturn(500L);
+
+ // Second volume, with 600 bytes of space.
+ volumes.add(Mockito.mock(FSVolume.class));
+ Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L);
+
+ RoundRobinVolumesPolicy policy = new RoundRobinVolumesPolicy();
+ int blockSize = 700;
+ try {
+ policy.chooseVolume(volumes, blockSize);
+ Assert.fail("expected to throw DiskOutOfSpaceException");
+ } catch (DiskOutOfSpaceException e) {
+ Assert
+ .assertEquals(
+ "Not returnig the expected message",
+ "Insufficient space for an additional block. Volume with the most available space has 600 bytes free, configured block size is " + blockSize, e
+ .getMessage());
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Tue Jul 26 01:53:10 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
@@ -1111,9 +1112,11 @@ public class NNThroughputBenchmark {
// start data-nodes; create a bunch of files; generate block reports.
blockReportObject.generateInputs(ignore);
// stop replication monitor
- namesystem.replthread.interrupt();
+ BlockManagerTestUtil.getReplicationThread(namesystem.getBlockManager())
+ .interrupt();
try {
- namesystem.replthread.join();
+ BlockManagerTestUtil.getReplicationThread(namesystem.getBlockManager())
+ .join();
} catch(InterruptedException ei) {
return;
}
@@ -1156,7 +1159,8 @@ public class NNThroughputBenchmark {
assert daemonId < numThreads : "Wrong daemonId.";
long start = System.currentTimeMillis();
// compute data-node work
- int work = nameNode.getNamesystem().computeDatanodeWork();
+ int work = BlockManagerTestUtil.getComputedDatanodeWork(nameNode
+ .getNamesystem().getBlockManager());
long end = System.currentTimeMillis();
numPendingBlocks += work;
if(work == 0)