You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/26 03:53:21 UTC
svn commit: r1150969 [1/3] - in /hadoop/common/branches/HDFS-1073/hdfs: ./
src/c++/libhdfs/ src/contrib/hdfsproxy/
src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/java/or...
Author: todd
Date: Tue Jul 26 01:53:10 2011
New Revision: 1150969
URL: http://svn.apache.org/viewvc?rev=1150969&view=rev
Log:
Merge hdfs and common trunk into branch
Added:
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
- copied unchanged from r1150966, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
- copied unchanged from r1150966, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
Modified:
hadoop/common/branches/HDFS-1073/hdfs/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt
hadoop/common/branches/HDFS-1073/hdfs/src/c++/libhdfs/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/HdfsProxy.java
hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ProxyFileDataServlet.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
hadoop/common/branches/HDFS-1073/hdfs/src/packages/rpm/spec/hadoop-hdfs.spec
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
hadoop/common/branches/HDFS-1073/hdfs/src/webapps/datanode/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/block_info_xml.jsp
hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/corrupt_files.jsp
hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/corrupt_replicas_xml.jsp
hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/dfshealth.jsp
hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/dfsnodelist.jsp
hadoop/common/branches/HDFS-1073/hdfs/src/webapps/secondary/ (props changed)
Propchange: hadoop/common/branches/HDFS-1073/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 01:53:10 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs:1134994-1148523
+/hadoop/common/trunk/hdfs:1134994-1150966
/hadoop/core/branches/branch-0.19/hdfs:713112
/hadoop/hdfs/branches/HDFS-1052:987665-1095512
/hadoop/hdfs/branches/HDFS-265:796829-820463
Modified: hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt Tue Jul 26 01:53:10 2011
@@ -294,6 +294,9 @@ Trunk (unreleased changes)
HDFS-2083. Query JMX statistics over http via JMXJsonServlet. (tanping)
+ HDFS-2156. Make hdfs and mapreduce rpm only depend on the same major
+ version for common and hdfs. (eyang via omalley)
+
IMPROVEMENTS
HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost
@@ -579,6 +582,25 @@ Trunk (unreleased changes)
and Random object creation to DFSUtil; move DFSClient.stringifyToken(..)
to DelegationTokenIdentifier. (szetszwo)
+ HDFS-1774. Small optimization to FSDataset. (Uma Maheswara Rao G via eli)
+
+ HDFS-2167. Move dnsToSwitchMapping and hostsReader from FSNamesystem to
+ DatanodeManager. (szetszwo)
+
+ HDFS-2116. Use Mokito in TestStreamFile and TestByteRangeInputStream.
+ (Plamen Jeliazkov via shv)
+
+ HDFS-2112. Move ReplicationMonitor to block management. (Uma Maheswara
+ Rao G via szetszwo)
+
+ HDFS-1739. Add available volume size to the error message when datanode
+ throws DiskOutOfSpaceException. (Uma Maheswara Rao G via szetszwo)
+
+ HDFS-2144. If SNN shuts down during initialization it does not log the
+ cause. (Ravi Prakash via atm)
+
+ HDFS-2180. Refactor NameNode HTTP server into new class. (todd)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -859,6 +881,11 @@ Trunk (unreleased changes)
HDFS-2152. TestWriteConfigurationToDFS causing the random failures. (Uma
Maheswara Rao G via atm)
+ HDFS-2114. re-commission of a decommissioned node does not delete
+ excess replicas. (John George via mattf)
+
+ HDFS-1776. Bug in Concat code. (Bharath Mundlapudi via Dmytro Molkov)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/HDFS-1073/hdfs/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 01:53:10 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs/src/c++/libhdfs:1134994-1148523
+/hadoop/common/trunk/hdfs/src/c++/libhdfs:1134994-1150966
/hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
/hadoop/core/trunk/src/c++/libhdfs:776175-784663
/hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs:987665-1095512
Propchange: hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 01:53:10 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs/src/contrib/hdfsproxy:1134994-1148523
+/hadoop/common/trunk/hdfs/src/contrib/hdfsproxy:1134994-1150966
/hadoop/core/branches/branch-0.19/hdfs/src/contrib/hdfsproxy:713112
/hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
/hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy:987665-1095512
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/HdfsProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/HdfsProxy.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/HdfsProxy.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/HdfsProxy.java Tue Jul 26 01:53:10 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
/**
* A HTTPS/SSL proxy to HDFS, implementing certificate based access control.
@@ -70,7 +71,7 @@ public class HdfsProxy {
this.server = new ProxyHttpServer(sslAddr, sslConf);
this.server.setAttribute("proxy.https.port", server.getPort());
- this.server.setAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY, nnAddr);
+ this.server.setAttribute(NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY, nnAddr);
this.server.setAttribute(JspHelper.CURRENT_CONF, new HdfsConfiguration());
this.server.addGlobalFilter("ProxyFilter", ProxyFilter.class.getName(), null);
this.server.addServlet("listPaths", "/listPaths/*", ProxyListPathsServlet.class);
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ProxyFileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ProxyFileDataServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ProxyFileDataServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ProxyFileDataServlet.java Tue Jul 26 01:53:10 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.FileDataServlet;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
import org.apache.hadoop.security.UserGroupInformation;
/** {@inheritDoc} */
@@ -47,7 +48,7 @@ public class ProxyFileDataServlet extend
dtParam=JspHelper.getDelegationTokenUrlParam(dt);
}
InetSocketAddress nnAddress = (InetSocketAddress) getServletContext()
- .getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
+ .getAttribute(NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
String nnHostPort = nnAddress == null ? null : NameNode
.getHostPortString(nnAddress);
String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS,
Propchange: hadoop/common/branches/HDFS-1073/hdfs/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 01:53:10 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs/src/java:1134994-1148523
+/hadoop/common/trunk/hdfs/src/java:1134994-1150966
/hadoop/core/branches/branch-0.19/hdfs/src/java:713112
/hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
/hadoop/hdfs/branches/HDFS-1052/src/java:987665-1095512
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Jul 26 01:53:10 2011
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
@@ -33,15 +35,16 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
@@ -51,6 +54,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.util.Daemon;
/**
* Keeps information related to the blocks stored in the Hadoop cluster.
@@ -98,6 +102,9 @@ public class BlockManager {
return excessBlocksCount;
}
+ /**replicationRecheckInterval is how often namenode checks for new replication work*/
+ private final long replicationRecheckInterval;
+
/**
* Mapping: Block -> { INode, datanodes, self ref }
* Updated only in response to client-sent information.
@@ -105,11 +112,12 @@ public class BlockManager {
public final BlocksMap blocksMap;
private final DatanodeManager datanodeManager;
-
- //
- // Store blocks-->datanodedescriptor(s) map of corrupt replicas
- //
- private final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
+
+ /** Replication thread. */
+ final Daemon replicationThread = new Daemon(new ReplicationMonitor());
+
+ /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
+ final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
//
// Keeps a Collection for every named machine containing
@@ -136,34 +144,33 @@ public class BlockManager {
public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
private final PendingReplicationBlocks pendingReplications;
- // The maximum number of replicas allowed for a block
+ /** The maximum number of replicas allowed for a block */
public final int maxReplication;
- // How many outgoing replication streams a given node should have at one time
+ /** The maximum number of outgoing replication streams
+ * a given node should have at one time
+ */
public int maxReplicationStreams;
- // Minimum copies needed or else write is disallowed
+ /** Minimum copies needed or else write is disallowed */
public final int minReplication;
- // Default number of replicas
+ /** Default number of replicas */
public final int defaultReplication;
- // How many entries are returned by getCorruptInodes()
+ /** The maximum number of entries returned by getCorruptInodes() */
final int maxCorruptFilesReturned;
- // variable to enable check for enough racks
+ /** variable to enable check for enough racks */
final boolean shouldCheckForEnoughRacks;
- /**
- * Last block index used for replication work.
- */
+ /** Last block index used for replication work. */
private int replIndex = 0;
- // for block replicas placement
- public final BlockPlacementPolicy replicator;
+ /** for block replicas placement */
+ private BlockPlacementPolicy blockplacement;
public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
namesystem = fsn;
- datanodeManager = new DatanodeManager(fsn);
-
+ datanodeManager = new DatanodeManager(fsn, conf);
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
- replicator = BlockPlacementPolicy.getInstance(
+ blockplacement = BlockPlacementPolicy.getInstance(
conf, namesystem, datanodeManager.getNetworkTopology());
pendingReplications = new PendingReplicationBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
@@ -197,22 +204,29 @@ public class BlockManager {
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false
: true;
+
+ this.replicationRecheckInterval =
+ conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
FSNamesystem.LOG.info("defaultReplication = " + defaultReplication);
FSNamesystem.LOG.info("maxReplication = " + maxReplication);
FSNamesystem.LOG.info("minReplication = " + minReplication);
FSNamesystem.LOG.info("maxReplicationStreams = " + maxReplicationStreams);
FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
+ FSNamesystem.LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
}
public void activate(Configuration conf) {
pendingReplications.start();
datanodeManager.activate(conf);
+ this.replicationThread.start();
}
public void close() {
if (pendingReplications != null) pendingReplications.stop();
blocksMap.close();
datanodeManager.close();
+ if (replicationThread != null) replicationThread.interrupt();
}
/** @return the datanodeManager */
@@ -220,6 +234,19 @@ public class BlockManager {
return datanodeManager;
}
+ /** @return the BlockPlacementPolicy */
+ public BlockPlacementPolicy getBlockPlacementPolicy() {
+ return blockplacement;
+ }
+
+ /** Set BlockPlacementPolicy */
+ public void setBlockPlacementPolicy(BlockPlacementPolicy newpolicy) {
+ if (newpolicy == null) {
+ throw new HadoopIllegalArgumentException("newpolicy == null");
+ }
+ this.blockplacement = newpolicy;
+ }
+
public void metaSave(PrintWriter out) {
//
// Dump contents of neededReplication
@@ -551,7 +578,7 @@ public class BlockManager {
}
}
- void removeFromInvalidates(String storageID, Block block) {
+ private void removeFromInvalidates(String storageID, Block block) {
Collection<Block> v = recentInvalidateSets.get(storageID);
if (v != null && v.remove(block)) {
pendingDeletionBlocksCount--;
@@ -921,7 +948,7 @@ public class BlockManager {
// It is costly to extract the filename for which chooseTargets is called,
// so for now we pass in the Inode itself.
DatanodeDescriptor targets[] =
- replicator.chooseTarget(fileINode, additionalReplRequired,
+ blockplacement.chooseTarget(fileINode, additionalReplRequired,
srcNode, containingNodes, block.getNumBytes());
if(targets.length == 0)
return false;
@@ -1021,7 +1048,7 @@ public class BlockManager {
final HashMap<Node, Node> excludedNodes,
final long blocksize) throws IOException {
// choose targets for the new block to be allocated.
- final DatanodeDescriptor targets[] = replicator.chooseTarget(
+ final DatanodeDescriptor targets[] = blockplacement.chooseTarget(
src, numOfReplicas, client, excludedNodes, blocksize);
if (targets.length < minReplication) {
throw new IOException("File " + src + " could only be replicated to " +
@@ -1240,7 +1267,7 @@ public class BlockManager {
}
}
- void reportDiff(DatanodeDescriptor dn,
+ private void reportDiff(DatanodeDescriptor dn,
BlockListAsLongs newReport,
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor
@@ -1670,7 +1697,7 @@ public class BlockManager {
}
}
namesystem.chooseExcessReplicates(nonExcess, block, replication,
- addedNode, delNodeHint, replicator);
+ addedNode, delNodeHint, blockplacement);
}
public void addToExcessReplicate(DatanodeInfo dn, Block block) {
@@ -1694,7 +1721,7 @@ public class BlockManager {
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
* removed block is still valid.
*/
- public void removeStoredBlock(Block block, DatanodeDescriptor node) {
+ private void removeStoredBlock(Block block, DatanodeDescriptor node) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+ block + " from " + node.getName());
@@ -1878,10 +1905,30 @@ public class BlockManager {
}
/**
+ * On stopping decommission, check if the node has excess replicas.
+ * If there are any excess replicas, call processOverReplicatedBlock()
+ */
+ private void processOverReplicatedBlocksOnReCommission(
+ final DatanodeDescriptor srcNode) {
+ final Iterator<? extends Block> it = srcNode.getBlockIterator();
+ while(it.hasNext()) {
+ final Block block = it.next();
+ INodeFile fileINode = blocksMap.getINode(block);
+ short expectedReplication = fileINode.getReplication();
+ NumberReplicas num = countNodes(block);
+ int numCurrentReplica = num.liveReplicas();
+ if (numCurrentReplica > expectedReplication) {
+ // over-replicated block
+ processOverReplicatedBlock(block, expectedReplication, null, null);
+ }
+ }
+ }
+
+ /**
* Return true if there are any blocks on this node that have not
* yet reached their replication factor. Otherwise returns false.
*/
- public boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
+ boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
boolean status = false;
int underReplicatedBlocks = 0;
int decommissionOnlyReplicas = 0;
@@ -2003,7 +2050,7 @@ public class BlockManager {
}
/** Remove a datanode from the invalidatesSet */
- public void removeFromInvalidates(String storageID) {
+ private void removeFromInvalidates(String storageID) {
Collection<Block> blocks = recentInvalidateSets.remove(storageID);
if (blocks != null) {
pendingDeletionBlocksCount -= blocks.size();
@@ -2067,28 +2114,6 @@ public class BlockManager {
namesystem.writeUnlock();
}
}
-
- //Returns the number of racks over which a given block is replicated
- //decommissioning/decommissioned nodes are not counted. corrupt replicas
- //are also ignored
- public int getNumberOfRacks(Block b) {
- HashSet<String> rackSet = new HashSet<String>(0);
- Collection<DatanodeDescriptor> corruptNodes =
- corruptReplicas.getNodes(b);
- for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
- it.hasNext();) {
- DatanodeDescriptor cur = it.next();
- if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
- if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
- String rackName = cur.getNetworkLocation();
- if (!rackSet.contains(rackName)) {
- rackSet.add(rackName);
- }
- }
- }
- }
- return rackSet.size();
- }
boolean blockHasEnoughRacks(Block b) {
if (!this.shouldCheckForEnoughRacks) {
@@ -2190,4 +2215,118 @@ public class BlockManager {
return neededReplications
.iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
+
+ /**
+ * Change, if appropriate, the admin state of a datanode to
+ * decommission completed. Return true if decommission is complete.
+ */
+ boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
+ // Check to see if all blocks in this decommissioned
+ // node has reached their target replication factor.
+ if (node.isDecommissionInProgress()) {
+ if (!isReplicationInProgress(node)) {
+ node.setDecommissioned();
+ LOG.info("Decommission complete for node " + node.getName());
+ }
+ }
+ return node.isDecommissioned();
+ }
+
+ /** Start decommissioning the specified datanode. */
+ void startDecommission(DatanodeDescriptor node) throws IOException {
+ if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+ LOG.info("Start Decommissioning node " + node.getName() + " with " +
+ node.numBlocks() + " blocks.");
+ synchronized (namesystem.heartbeats) {
+ namesystem.updateStats(node, false);
+ node.startDecommission();
+ namesystem.updateStats(node, true);
+ }
+ node.decommissioningStatus.setStartTime(now());
+
+ // all the blocks that reside on this node have to be replicated.
+ checkDecommissionStateInternal(node);
+ }
+ }
+
+ /** Stop decommissioning the specified datanodes. */
+ void stopDecommission(DatanodeDescriptor node) throws IOException {
+ if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+ LOG.info("Stop Decommissioning node " + node.getName());
+ synchronized (namesystem.heartbeats) {
+ namesystem.updateStats(node, false);
+ node.stopDecommission();
+ namesystem.updateStats(node, true);
+ }
+ processOverReplicatedBlocksOnReCommission(node);
+ }
+ }
+
+ /**
+ * Periodically calls computeReplicationWork().
+ */
+ private class ReplicationMonitor implements Runnable {
+ static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
+ static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
+
+ @Override
+ public void run() {
+ while (namesystem.isRunning()) {
+ try {
+ computeDatanodeWork();
+ processPendingReplications();
+ Thread.sleep(replicationRecheckInterval);
+ } catch (InterruptedException ie) {
+ LOG.warn("ReplicationMonitor thread received InterruptedException.", ie);
+ break;
+ } catch (IOException ie) {
+ LOG.warn("ReplicationMonitor thread received exception. " , ie);
+ } catch (Throwable t) {
+ LOG.warn("ReplicationMonitor thread received Runtime exception. ", t);
+ Runtime.getRuntime().exit(-1);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Compute block replication and block invalidation work that can be scheduled
+ * on data-nodes. The datanode will be informed of this work at the next
+ * heartbeat.
+ *
+ * @return number of blocks scheduled for replication or removal.
+ * @throws IOException
+ */
+ int computeDatanodeWork() throws IOException {
+ int workFound = 0;
+ int blocksToProcess = 0;
+ int nodesToProcess = 0;
+ // Blocks should not be replicated or removed if in safe mode.
+ // It's OK to check safe mode here w/o holding lock, in the worst
+ // case extra replications will be scheduled, and these will get
+ // fixed up later.
+ if (namesystem.isInSafeMode())
+ return workFound;
+
+ synchronized (namesystem.heartbeats) {
+ blocksToProcess = (int) (namesystem.heartbeats.size() * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
+ nodesToProcess = (int) Math.ceil((double) namesystem.heartbeats.size()
+ * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
+ }
+
+ workFound = this.computeReplicationWork(blocksToProcess);
+
+ // Update FSNamesystemMetrics counters
+ namesystem.writeLock();
+ try {
+ this.updateState();
+ this.scheduledReplicationBlocksCount = workFound;
+ } finally {
+ namesystem.writeUnlock();
+ }
+ workFound += this.computeInvalidateWork(nodesToProcess);
+ return workFound;
+ }
+
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Tue Jul 26 01:53:10 2011
@@ -18,8 +18,14 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -28,11 +34,23 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* Manage datanodes, include decommission and other activities.
@@ -49,9 +67,29 @@ public class DatanodeManager {
/** Host names to datanode descriptors mapping. */
private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
+
+ private final DNSToSwitchMapping dnsToSwitchMapping;
+
+ /** Read include/exclude files*/
+ private final HostsFileReader hostsReader;
- DatanodeManager(final FSNamesystem namesystem) {
+ DatanodeManager(final FSNamesystem namesystem, final Configuration conf
+ ) throws IOException {
this.namesystem = namesystem;
+ this.hostsReader = new HostsFileReader(
+ conf.get(DFSConfigKeys.DFS_HOSTS, ""),
+ conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+
+ this.dnsToSwitchMapping = ReflectionUtils.newInstance(
+ conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
+
+ // If the dns to switch mapping supports cache, resolve network
+ // locations of those hosts in the include list and store the mapping
+ // in the cache; so future calls to resolve will be fast.
+ if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
+ dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
+ }
}
private Daemon decommissionthread = null;
@@ -93,7 +131,7 @@ public class DatanodeManager {
}
/** Add a datanode. */
- public void addDatanode(final DatanodeDescriptor node) {
+ private void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
@@ -112,7 +150,7 @@ public class DatanodeManager {
}
/** Physically remove node from datanodeMap. */
- public void wipeDatanode(final DatanodeID node) throws IOException {
+ private void wipeDatanode(final DatanodeID node) throws IOException {
final String key = node.getStorageID();
synchronized (namesystem.datanodeMap) {
host2DatanodeMap.remove(namesystem.datanodeMap.remove(key));
@@ -123,4 +161,380 @@ public class DatanodeManager {
+ " is removed from datanodeMap.");
}
}
+
+ /* Resolve a node's network location */
+ private void resolveNetworkLocation (DatanodeDescriptor node) {
+ List<String> names = new ArrayList<String>(1);
+ if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
+ // get the node's IP address
+ names.add(node.getHost());
+ } else {
+ // get the node's host name
+ String hostName = node.getHostName();
+ int colon = hostName.indexOf(":");
+ hostName = (colon==-1)?hostName:hostName.substring(0,colon);
+ names.add(hostName);
+ }
+
+ // resolve its network location
+ List<String> rName = dnsToSwitchMapping.resolve(names);
+ String networkLocation;
+ if (rName == null) {
+ LOG.error("The resolve call returned null! Using " +
+ NetworkTopology.DEFAULT_RACK + " for host " + names);
+ networkLocation = NetworkTopology.DEFAULT_RACK;
+ } else {
+ networkLocation = rName.get(0);
+ }
+ node.setNetworkLocation(networkLocation);
+ }
+
+ private boolean inHostsList(DatanodeID node, String ipAddr) {
+ return checkInList(node, ipAddr, hostsReader.getHosts(), false);
+ }
+
+ private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
+ return checkInList(node, ipAddr, hostsReader.getExcludedHosts(), true);
+ }
+
+ /**
+ * Remove an already decommissioned data node who is neither in include nor
+ * exclude hosts lists from the the list of live or dead nodes. This is used
+ * to not display an already decommssioned data node to the operators.
+ * The operation procedure of making a already decommissioned data node not
+ * to be displayed is as following:
+ * <ol>
+ * <li>
+ * Host must have been in the include hosts list and the include hosts list
+ * must not be empty.
+ * </li>
+ * <li>
+ * Host is decommissioned by remaining in the include hosts list and added
+ * into the exclude hosts list. Name node is updated with the new
+ * information by issuing dfsadmin -refreshNodes command.
+ * </li>
+ * <li>
+ * Host is removed from both include hosts and exclude hosts lists. Name
+ * node is updated with the new informationby issuing dfsamin -refreshNodes
+ * command.
+ * <li>
+ * </ol>
+ *
+ * @param nodeList
+ * , array list of live or dead nodes.
+ */
+ public void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
+ // If the include list is empty, any nodes are welcomed and it does not
+ // make sense to exclude any nodes from the cluster. Therefore, no remove.
+ if (hostsReader.getHosts().isEmpty()) {
+ return;
+ }
+
+ for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
+ DatanodeDescriptor node = it.next();
+ if ((!inHostsList(node, null)) && (!inExcludedHostsList(node, null))
+ && node.isDecommissioned()) {
+ // Include list is not empty, an existing datanode does not appear
+ // in both include or exclude lists and it has been decommissioned.
+ // Remove it from the node list.
+ it.remove();
+ }
+ }
+ }
+
+ /**
+ * Check if the given node (of DatanodeID or ipAddress) is in the (include or
+ * exclude) list. If ipAddress in null, check only based upon the given
+ * DatanodeID. If ipAddress is not null, the ipAddress should refers to the
+ * same host that given DatanodeID refers to.
+ *
+ * @param node, the host DatanodeID
+ * @param ipAddress, if not null, should refers to the same host
+ * that DatanodeID refers to
+ * @param hostsList, the list of hosts in the include/exclude file
+ * @param isExcludeList, boolean, true if this is the exclude list
+ * @return boolean, if in the list
+ */
+ private static boolean checkInList(final DatanodeID node,
+ final String ipAddress,
+ final Set<String> hostsList,
+ final boolean isExcludeList) {
+ final InetAddress iaddr;
+ if (ipAddress != null) {
+ try {
+ iaddr = InetAddress.getByName(ipAddress);
+ } catch (UnknownHostException e) {
+ LOG.warn("Unknown ip address: " + ipAddress, e);
+ return isExcludeList;
+ }
+ } else {
+ try {
+ iaddr = InetAddress.getByName(node.getHost());
+ } catch (UnknownHostException e) {
+ LOG.warn("Unknown host: " + node.getHost(), e);
+ return isExcludeList;
+ }
+ }
+
+ // if include list is empty, host is in include list
+ if ( (!isExcludeList) && (hostsList.isEmpty()) ){
+ return true;
+ }
+ return // compare ipaddress(:port)
+ (hostsList.contains(iaddr.getHostAddress().toString()))
+ || (hostsList.contains(iaddr.getHostAddress().toString() + ":"
+ + node.getPort()))
+ // compare hostname(:port)
+ || (hostsList.contains(iaddr.getHostName()))
+ || (hostsList.contains(iaddr.getHostName() + ":" + node.getPort()))
+ || ((node instanceof DatanodeInfo) && hostsList
+ .contains(((DatanodeInfo) node).getHostName()));
+ }
+
+ /**
+ * Decommission the node if it is in exclude list.
+ */
+ private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr)
+ throws IOException {
+ // If the registered node is in exclude list, then decommission it
+ if (inExcludedHostsList(nodeReg, ipAddr)) {
+ namesystem.getBlockManager().startDecommission(nodeReg);
+ }
+ }
+
+
+ /**
+ * Generate new storage ID.
+ *
+ * @return unique storage ID
+ *
+ * Note: that collisions are still possible if somebody will try
+ * to bring in a data storage from a different cluster.
+ */
+ private String newStorageID() {
+ String newID = null;
+ while(newID == null) {
+ newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
+ if (namesystem.datanodeMap.get(newID) != null)
+ newID = null;
+ }
+ return newID;
+ }
+
+ public void registerDatanode(DatanodeRegistration nodeReg
+ ) throws IOException {
+ String dnAddress = Server.getRemoteAddress();
+ if (dnAddress == null) {
+ // Mostly called inside an RPC.
+ // But if not, use address passed by the data-node.
+ dnAddress = nodeReg.getHost();
+ }
+
+ // Checks if the node is not on the hosts list. If it is not, then
+ // it will be disallowed from registering.
+ if (!inHostsList(nodeReg, dnAddress)) {
+ throw new DisallowedDatanodeException(nodeReg);
+ }
+
+ String hostName = nodeReg.getHost();
+
+ // update the datanode's name with ip:port
+ DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
+ nodeReg.getStorageID(),
+ nodeReg.getInfoPort(),
+ nodeReg.getIpcPort());
+ nodeReg.updateRegInfo(dnReg);
+ nodeReg.exportedKeys = namesystem.getBlockKeys();
+
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.registerDatanode: "
+ + "node registration from " + nodeReg.getName()
+ + " storage " + nodeReg.getStorageID());
+
+ DatanodeDescriptor nodeS = namesystem.datanodeMap.get(nodeReg.getStorageID());
+ DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getName());
+
+ if (nodeN != null && nodeN != nodeS) {
+ NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
+ + "node from name: " + nodeN.getName());
+ // nodeN previously served a different data storage,
+ // which is not served by anybody anymore.
+ namesystem.removeDatanode(nodeN);
+ // physically remove node from datanodeMap
+ wipeDatanode(nodeN);
+ nodeN = null;
+ }
+
+ if (nodeS != null) {
+ if (nodeN == nodeS) {
+ // The same datanode has been just restarted to serve the same data
+ // storage. We do not need to remove old data blocks, the delta will
+ // be calculated on the next block report from the datanode
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
+ + "node restarted.");
+ }
+ } else {
+ // nodeS is found
+ /* The registering datanode is a replacement node for the existing
+ data storage, which from now on will be served by a new node.
+ If this message repeats, both nodes might have same storageID
+ by (insanely rare) random chance. User needs to restart one of the
+ nodes with its data cleared (or user can just remove the StorageID
+ value in "VERSION" file under the data directory of the datanode,
+ but this is might not work if VERSION file format has changed
+ */
+ NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
+ + "node " + nodeS.getName()
+ + " is replaced by " + nodeReg.getName() +
+ " with the same storageID " +
+ nodeReg.getStorageID());
+ }
+ // update cluster map
+ getNetworkTopology().remove(nodeS);
+ nodeS.updateRegInfo(nodeReg);
+ nodeS.setHostName(hostName);
+ nodeS.setDisallowed(false); // Node is in the include list
+
+ // resolve network location
+ resolveNetworkLocation(nodeS);
+ getNetworkTopology().add(nodeS);
+
+ // also treat the registration message as a heartbeat
+ synchronized(namesystem.heartbeats) {
+ if( !namesystem.heartbeats.contains(nodeS)) {
+ namesystem.heartbeats.add(nodeS);
+ //update its timestamp
+ nodeS.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
+ nodeS.isAlive = true;
+ }
+ }
+ checkDecommissioning(nodeS, dnAddress);
+ return;
+ }
+
+ // this is a new datanode serving a new data storage
+ if (nodeReg.getStorageID().equals("")) {
+ // this data storage has never been registered
+ // it is either empty or was created by pre-storageID version of DFS
+ nodeReg.storageID = newStorageID();
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug(
+ "BLOCK* NameSystem.registerDatanode: "
+ + "new storageID " + nodeReg.getStorageID() + " assigned.");
+ }
+ }
+ // register new datanode
+ DatanodeDescriptor nodeDescr
+ = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
+ resolveNetworkLocation(nodeDescr);
+ addDatanode(nodeDescr);
+ checkDecommissioning(nodeDescr, dnAddress);
+
+ // also treat the registration message as a heartbeat
+ synchronized(namesystem.heartbeats) {
+ namesystem.heartbeats.add(nodeDescr);
+ nodeDescr.isAlive = true;
+ // no need to update its timestamp
+ // because its is done when the descriptor is created
+ }
+ }
+
+ /** Reread include/exclude files. */
+ public void refreshHostsReader(Configuration conf) throws IOException {
+ // Reread the conf to get dfs.hosts and dfs.hosts.exclude filenames.
+ // Update the file names and refresh internal includes and excludes list.
+ if (conf == null) {
+ conf = new HdfsConfiguration();
+ }
+ hostsReader.updateFileNames(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
+ conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+ hostsReader.refresh();
+ }
+
+ /**
+ * Rereads the config to get hosts and exclude list file names.
+ * Rereads the files to update the hosts and exclude lists. It
+ * checks if any of the hosts have changed states:
+ * 1. Added to hosts --> no further work needed here.
+ * 2. Removed from hosts --> mark AdminState as decommissioned.
+ * 3. Added to exclude --> start decommission.
+ * 4. Removed from exclude --> stop decommission.
+ */
+ public void refreshDatanodes() throws IOException {
+ for(DatanodeDescriptor node : namesystem.datanodeMap.values()) {
+ // Check if not include.
+ if (!inHostsList(node, null)) {
+ node.setDisallowed(true); // case 2.
+ } else {
+ if (inExcludedHostsList(node, null)) {
+ namesystem.getBlockManager().startDecommission(node); // case 3.
+ } else {
+ namesystem.getBlockManager().stopDecommission(node); // case 4.
+ }
+ }
+ }
+ }
+
+ /** For generating datanode reports */
+ public List<DatanodeDescriptor> getDatanodeListForReport(
+ final DatanodeReportType type) {
+ boolean listLiveNodes = type == DatanodeReportType.ALL ||
+ type == DatanodeReportType.LIVE;
+ boolean listDeadNodes = type == DatanodeReportType.ALL ||
+ type == DatanodeReportType.DEAD;
+
+ HashMap<String, String> mustList = new HashMap<String, String>();
+
+ if (listDeadNodes) {
+ //first load all the nodes listed in include and exclude files.
+ Iterator<String> it = hostsReader.getHosts().iterator();
+ while (it.hasNext()) {
+ mustList.put(it.next(), "");
+ }
+ it = hostsReader.getExcludedHosts().iterator();
+ while (it.hasNext()) {
+ mustList.put(it.next(), "");
+ }
+ }
+
+ ArrayList<DatanodeDescriptor> nodes = null;
+
+ synchronized (namesystem.datanodeMap) {
+ nodes = new ArrayList<DatanodeDescriptor>(namesystem.datanodeMap.size() +
+ mustList.size());
+ Iterator<DatanodeDescriptor> it = namesystem.datanodeMap.values().iterator();
+ while (it.hasNext()) {
+ DatanodeDescriptor dn = it.next();
+ boolean isDead = namesystem.isDatanodeDead(dn);
+ if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
+ nodes.add(dn);
+ }
+ //Remove any form of the this datanode in include/exclude lists.
+ try {
+ InetAddress inet = InetAddress.getByName(dn.getHost());
+ // compare hostname(:port)
+ mustList.remove(inet.getHostName());
+ mustList.remove(inet.getHostName()+":"+dn.getPort());
+ // compare ipaddress(:port)
+ mustList.remove(inet.getHostAddress().toString());
+ mustList.remove(inet.getHostAddress().toString()+ ":" +dn.getPort());
+ } catch ( UnknownHostException e ) {
+ mustList.remove(dn.getName());
+ mustList.remove(dn.getHost());
+ LOG.warn(e);
+ }
+ }
+ }
+
+ if (listDeadNodes) {
+ Iterator<String> it = mustList.keySet().iterator();
+ while (it.hasNext()) {
+ DatanodeDescriptor dn =
+ new DatanodeDescriptor(new DatanodeID(it.next()));
+ dn.setLastUpdate(0);
+ nodes.add(dn);
+ }
+ }
+ return nodes;
+ }
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java Tue Jul 26 01:53:10 2011
@@ -35,9 +35,11 @@ class DecommissionManager {
static final Log LOG = LogFactory.getLog(DecommissionManager.class);
private final FSNamesystem fsnamesystem;
+ private final BlockManager blockManager;
DecommissionManager(FSNamesystem namesystem) {
this.fsnamesystem = namesystem;
+ this.blockManager = fsnamesystem.getBlockManager();
}
/** Periodically check decommission status. */
@@ -88,7 +90,7 @@ class DecommissionManager {
if (d.isDecommissionInProgress()) {
try {
- fsnamesystem.checkDecommissionStateInternal(d);
+ blockManager.checkDecommissionStateInternal(d);
} catch(Exception e) {
LOG.warn("entry=" + entry, e);
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Tue Jul 26 01:53:10 2011
@@ -51,7 +51,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
import org.apache.hadoop.http.HtmlQuoting;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
@@ -487,8 +487,8 @@ public class JspHelper {
if (namenodeAddressInUrl != null) {
namenodeAddress = DFSUtil.getSocketAddress(namenodeAddressInUrl);
} else if (context != null) {
- namenodeAddress = (InetSocketAddress) context
- .getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
+ namenodeAddress = NameNodeHttpServer.getNameNodeAddressFromContext(
+ context);
}
if (namenodeAddress != null) {
return (namenodeAddress.getAddress().getHostAddress() + ":"
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Jul 26 01:53:10 2011
@@ -99,23 +99,16 @@ public class FSDataset implements FSCons
}
} else {
File[] files = FileUtil.listFiles(dir);
- int numChildren = 0;
+ List<FSDir> dirList = new ArrayList<FSDir>();
for (int idx = 0; idx < files.length; idx++) {
if (files[idx].isDirectory()) {
- numChildren++;
+ dirList.add(new FSDir(files[idx]));
} else if (Block.isBlockFilename(files[idx])) {
numBlocks++;
}
}
- if (numChildren > 0) {
- children = new FSDir[numChildren];
- int curdir = 0;
- for (int idx = 0; idx < files.length; idx++) {
- if (files[idx].isDirectory()) {
- children[curdir] = new FSDir(files[idx]);
- curdir++;
- }
- }
+ if (dirList.size() > 0) {
+ children = dirList.toArray(new FSDir[dirList.size()]);
}
}
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java Tue Jul 26 01:53:10 2011
@@ -41,13 +41,24 @@ public class RoundRobinVolumesPolicy imp
}
int startVolume = curVolume;
+ long maxAvailable = 0;
while (true) {
FSVolume volume = volumes.get(curVolume);
curVolume = (curVolume + 1) % volumes.size();
- if (volume.getAvailable() > blockSize) { return volume; }
+ long availableVolumeSize = volume.getAvailable();
+ if (availableVolumeSize > blockSize) { return volume; }
+
+ if (availableVolumeSize > maxAvailable) {
+ maxAvailable = availableVolumeSize;
+ }
+
if (curVolume == startVolume) {
- throw new DiskOutOfSpaceException("Insufficient space for an additional block");
+ throw new DiskOutOfSpaceException(
+ "Insufficient space for an additional block. Volume with the most available space has "
+ + maxAvailable
+ + " bytes free, configured block size is "
+ + blockSize);
}
}
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Tue Jul 26 01:53:10 2011
@@ -110,10 +110,10 @@ public class BackupNode extends NameNode
String addr = conf.get(BN_HTTP_ADDRESS_NAME_KEY, BN_HTTP_ADDRESS_DEFAULT);
return NetUtils.createSocketAddr(addr);
}
-
+
@Override // NameNode
protected void setHttpServerAddress(Configuration conf){
- conf.set(BN_HTTP_ADDRESS_NAME_KEY, getHostPortString(httpAddress));
+ conf.set(BN_HTTP_ADDRESS_NAME_KEY, getHostPortString(getHttpAddress()));
}
@Override // NameNode
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java Tue Jul 26 01:53:10 2011
@@ -28,7 +28,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -46,8 +45,7 @@ public class CancelDelegationTokenServle
throws ServletException, IOException {
final UserGroupInformation ugi;
final ServletContext context = getServletContext();
- final Configuration conf =
- (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+ final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
try {
ugi = getUGI(req, conf);
} catch(IOException ioe) {
@@ -57,7 +55,8 @@ public class CancelDelegationTokenServle
"Unable to identify or authenticate user");
return;
}
- final NameNode nn = (NameNode) context.getAttribute("name.node");
+ final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
+ context);
String tokenString = req.getParameter(TOKEN);
if (tokenString == null) {
resp.sendError(HttpServletResponse.SC_MULTIPLE_CHOICES,
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Tue Jul 26 01:53:10 2011
@@ -105,11 +105,6 @@ class Checkpointer extends Daemon {
String fullInfoAddr = conf.get(DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT);
infoBindAddress = fullInfoAddr.substring(0, fullInfoAddr.indexOf(":"));
-
- HttpServer httpServer = backupNode.httpServer;
- httpServer.setAttribute("name.system.image", getFSImage());
- httpServer.setAttribute("name.conf", conf);
- httpServer.addInternalServlet("getimage", "/getimage", GetImageServlet.class);
LOG.info("Checkpoint Period : " + checkpointPeriod + " secs " +
"(" + checkpointPeriod/60 + " min)");
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java Tue Jul 26 01:53:10 2011
@@ -75,13 +75,14 @@ abstract class DfsServlet extends HttpSe
ServletContext context = getServletContext();
// if we are running in the Name Node, use it directly rather than via
// rpc
- NameNode nn = (NameNode) context.getAttribute("name.node");
+ NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
if (nn != null) {
return nn;
}
- InetSocketAddress nnAddr = (InetSocketAddress)context.getAttribute("name.node.address");
+ InetSocketAddress nnAddr =
+ NameNodeHttpServer.getNameNodeAddressFromContext(context);
Configuration conf = new HdfsConfiguration(
- (Configuration)context.getAttribute(JspHelper.CURRENT_CONF));
+ NameNodeHttpServer.getConfFromContext(context));
return DFSUtil.createNamenode(nnAddr, conf);
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Jul 26 01:53:10 2011
@@ -152,7 +152,7 @@ public class FSDirectory implements Clos
}
private BlockManager getBlockManager() {
- return getFSNamesystem().blockManager;
+ return getFSNamesystem().getBlockManager();
}
/**