You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/26 03:53:21 UTC

svn commit: r1150969 [1/3] - in /hadoop/common/branches/HDFS-1073/hdfs: ./ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/java/or...

Author: todd
Date: Tue Jul 26 01:53:10 2011
New Revision: 1150969

URL: http://svn.apache.org/viewvc?rev=1150969&view=rev
Log:
Merge hdfs and common trunk into branch

Added:
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
      - copied unchanged from r1150966, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
      - copied unchanged from r1150966, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
Modified:
    hadoop/common/branches/HDFS-1073/hdfs/   (props changed)
    hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-1073/hdfs/src/c++/libhdfs/   (props changed)
    hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/   (props changed)
    hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/HdfsProxy.java
    hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ProxyFileDataServlet.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/   (props changed)
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/common/branches/HDFS-1073/hdfs/src/packages/rpm/spec/hadoop-hdfs.spec
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/   (props changed)
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
    hadoop/common/branches/HDFS-1073/hdfs/src/webapps/datanode/   (props changed)
    hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/   (props changed)
    hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/block_info_xml.jsp
    hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/corrupt_files.jsp
    hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/corrupt_replicas_xml.jsp
    hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/dfshealth.jsp
    hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/dfsnodelist.jsp
    hadoop/common/branches/HDFS-1073/hdfs/src/webapps/secondary/   (props changed)

Propchange: hadoop/common/branches/HDFS-1073/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 01:53:10 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs:1134994-1148523
+/hadoop/common/trunk/hdfs:1134994-1150966
 /hadoop/core/branches/branch-0.19/hdfs:713112
 /hadoop/hdfs/branches/HDFS-1052:987665-1095512
 /hadoop/hdfs/branches/HDFS-265:796829-820463

Modified: hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt Tue Jul 26 01:53:10 2011
@@ -294,6 +294,9 @@ Trunk (unreleased changes)
    
     HDFS-2083. Query JMX statistics over http via JMXJsonServlet. (tanping)
 
+    HDFS-2156. Make hdfs and mapreduce rpm only depend on the same major 
+    version for common and hdfs. (eyang via omalley)
+
   IMPROVEMENTS
 
     HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost
@@ -579,6 +582,25 @@ Trunk (unreleased changes)
     and Random object creation to DFSUtil; move DFSClient.stringifyToken(..)
     to DelegationTokenIdentifier.  (szetszwo)
 
+    HDFS-1774. Small optimization to FSDataset. (Uma Maheswara Rao G via eli)
+
+    HDFS-2167.  Move dnsToSwitchMapping and hostsReader from FSNamesystem to
+    DatanodeManager.  (szetszwo)
+
+    HDFS-2116. Use Mokito in TestStreamFile and TestByteRangeInputStream.
+    (Plamen Jeliazkov via shv)
+
+    HDFS-2112.  Move ReplicationMonitor to block management.  (Uma Maheswara
+    Rao G via szetszwo)
+
+    HDFS-1739.  Add available volume size to the error message when datanode
+    throws DiskOutOfSpaceException.  (Uma Maheswara Rao G via szetszwo)
+
+    HDFS-2144. If SNN shuts down during initialization it does not log the
+    cause. (Ravi Prakash via atm)
+
+    HDFS-2180. Refactor NameNode HTTP server into new class. (todd)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -859,6 +881,11 @@ Trunk (unreleased changes)
     HDFS-2152. TestWriteConfigurationToDFS causing the random failures. (Uma
     Maheswara Rao G via atm)
 
+    HDFS-2114. re-commission of a decommissioned node does not delete 
+    excess replicas. (John George via mattf)
+
+    HDFS-1776. Bug in Concat code. (Bharath Mundlapudi via Dmytro Molkov)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-1073/hdfs/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 01:53:10 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs/src/c++/libhdfs:1134994-1148523
+/hadoop/common/trunk/hdfs/src/c++/libhdfs:1134994-1150966
 /hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
 /hadoop/core/trunk/src/c++/libhdfs:776175-784663
 /hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs:987665-1095512

Propchange: hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 01:53:10 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs/src/contrib/hdfsproxy:1134994-1148523
+/hadoop/common/trunk/hdfs/src/contrib/hdfsproxy:1134994-1150966
 /hadoop/core/branches/branch-0.19/hdfs/src/contrib/hdfsproxy:713112
 /hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
 /hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy:987665-1095512

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/HdfsProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/HdfsProxy.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/HdfsProxy.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/HdfsProxy.java Tue Jul 26 01:53:10 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 
 /**
  * A HTTPS/SSL proxy to HDFS, implementing certificate based access control.
@@ -70,7 +71,7 @@ public class HdfsProxy {
 
     this.server = new ProxyHttpServer(sslAddr, sslConf);
     this.server.setAttribute("proxy.https.port", server.getPort());
-    this.server.setAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY, nnAddr);
+    this.server.setAttribute(NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY, nnAddr);
     this.server.setAttribute(JspHelper.CURRENT_CONF, new HdfsConfiguration());
     this.server.addGlobalFilter("ProxyFilter", ProxyFilter.class.getName(), null);
     this.server.addServlet("listPaths", "/listPaths/*", ProxyListPathsServlet.class);

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ProxyFileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ProxyFileDataServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ProxyFileDataServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ProxyFileDataServlet.java Tue Jul 26 01:53:10 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.namenode.FileDataServlet;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /** {@inheritDoc} */
@@ -47,7 +48,7 @@ public class ProxyFileDataServlet extend
       dtParam=JspHelper.getDelegationTokenUrlParam(dt);
     }
     InetSocketAddress nnAddress = (InetSocketAddress) getServletContext()
-        .getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
+        .getAttribute(NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
     String nnHostPort = nnAddress == null ? null : NameNode
         .getHostPortString(nnAddress);
     String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS,

Propchange: hadoop/common/branches/HDFS-1073/hdfs/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 01:53:10 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs/src/java:1134994-1148523
+/hadoop/common/trunk/hdfs/src/java:1134994-1150966
 /hadoop/core/branches/branch-0.19/hdfs/src/java:713112
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-1052/src/java:987665-1095512

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Jul 26 01:53:10 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
@@ -33,15 +35,16 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
 import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
@@ -51,6 +54,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.util.Daemon;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
@@ -98,6 +102,9 @@ public class BlockManager {
     return excessBlocksCount;
   }
 
+  /**replicationRecheckInterval is how often namenode checks for new replication work*/
+  private final long replicationRecheckInterval;
+  
   /**
    * Mapping: Block -> { INode, datanodes, self ref }
    * Updated only in response to client-sent information.
@@ -105,11 +112,12 @@ public class BlockManager {
   public final BlocksMap blocksMap;
 
   private final DatanodeManager datanodeManager;
-
-  //
-  // Store blocks-->datanodedescriptor(s) map of corrupt replicas
-  //
-  private final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
+  
+  /** Replication thread. */
+  final Daemon replicationThread = new Daemon(new ReplicationMonitor());
+  
+  /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
+  final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
 
   //
   // Keeps a Collection for every named machine containing
@@ -136,34 +144,33 @@ public class BlockManager {
   public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
   private final PendingReplicationBlocks pendingReplications;
 
-  //  The maximum number of replicas allowed for a block
+  /** The maximum number of replicas allowed for a block */
   public final int maxReplication;
-  //  How many outgoing replication streams a given node should have at one time
+  /** The maximum number of outgoing replication streams
+   *  a given node should have at one time 
+   */
   public int maxReplicationStreams;
-  // Minimum copies needed or else write is disallowed
+  /** Minimum copies needed or else write is disallowed */
   public final int minReplication;
-  // Default number of replicas
+  /** Default number of replicas */
   public final int defaultReplication;
-  // How many entries are returned by getCorruptInodes()
+  /** The maximum number of entries returned by getCorruptInodes() */
   final int maxCorruptFilesReturned;
   
-  // variable to enable check for enough racks 
+  /** variable to enable check for enough racks */
   final boolean shouldCheckForEnoughRacks;
 
-  /**
-   * Last block index used for replication work.
-   */
+  /** Last block index used for replication work. */
   private int replIndex = 0;
 
-  // for block replicas placement
-  public final BlockPlacementPolicy replicator;
+  /** for block replicas placement */
+  private BlockPlacementPolicy blockplacement;
 
   public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
     namesystem = fsn;
-    datanodeManager = new DatanodeManager(fsn);
-
+    datanodeManager = new DatanodeManager(fsn, conf);
     blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
-    replicator = BlockPlacementPolicy.getInstance(
+    blockplacement = BlockPlacementPolicy.getInstance(
         conf, namesystem, datanodeManager.getNetworkTopology());
     pendingReplications = new PendingReplicationBlocks(conf.getInt(
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
@@ -197,22 +204,29 @@ public class BlockManager {
                                              DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
     this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false
                                                                              : true;
+    
+    this.replicationRecheckInterval = 
+      conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
+                  DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
     FSNamesystem.LOG.info("defaultReplication = " + defaultReplication);
     FSNamesystem.LOG.info("maxReplication = " + maxReplication);
     FSNamesystem.LOG.info("minReplication = " + minReplication);
     FSNamesystem.LOG.info("maxReplicationStreams = " + maxReplicationStreams);
     FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
+    FSNamesystem.LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
   }
 
   public void activate(Configuration conf) {
     pendingReplications.start();
     datanodeManager.activate(conf);
+    this.replicationThread.start();
   }
 
   public void close() {
     if (pendingReplications != null) pendingReplications.stop();
     blocksMap.close();
     datanodeManager.close();
+    if (replicationThread != null) replicationThread.interrupt();
   }
 
   /** @return the datanodeManager */
@@ -220,6 +234,19 @@ public class BlockManager {
     return datanodeManager;
   }
 
+  /** @return the BlockPlacementPolicy */
+  public BlockPlacementPolicy getBlockPlacementPolicy() {
+    return blockplacement;
+  }
+
+  /** Set BlockPlacementPolicy */
+  public void setBlockPlacementPolicy(BlockPlacementPolicy newpolicy) {
+    if (newpolicy == null) {
+      throw new HadoopIllegalArgumentException("newpolicy == null");
+    }
+    this.blockplacement = newpolicy;
+  }
+
   public void metaSave(PrintWriter out) {
     //
     // Dump contents of neededReplication
@@ -551,7 +578,7 @@ public class BlockManager {
     }
   }
   
-  void removeFromInvalidates(String storageID, Block block) {
+  private void removeFromInvalidates(String storageID, Block block) {
     Collection<Block> v = recentInvalidateSets.get(storageID);
     if (v != null && v.remove(block)) {
       pendingDeletionBlocksCount--;
@@ -921,7 +948,7 @@ public class BlockManager {
     // It is costly to extract the filename for which chooseTargets is called,
     // so for now we pass in the Inode itself.
     DatanodeDescriptor targets[] = 
-                       replicator.chooseTarget(fileINode, additionalReplRequired,
+                       blockplacement.chooseTarget(fileINode, additionalReplRequired,
                        srcNode, containingNodes, block.getNumBytes());
     if(targets.length == 0)
       return false;
@@ -1021,7 +1048,7 @@ public class BlockManager {
       final HashMap<Node, Node> excludedNodes,
       final long blocksize) throws IOException {
     // choose targets for the new block to be allocated.
-    final DatanodeDescriptor targets[] = replicator.chooseTarget(
+    final DatanodeDescriptor targets[] = blockplacement.chooseTarget(
         src, numOfReplicas, client, excludedNodes, blocksize);
     if (targets.length < minReplication) {
       throw new IOException("File " + src + " could only be replicated to " +
@@ -1240,7 +1267,7 @@ public class BlockManager {
     }
   }
 
-  void reportDiff(DatanodeDescriptor dn, 
+  private void reportDiff(DatanodeDescriptor dn, 
       BlockListAsLongs newReport, 
       Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
       Collection<Block> toRemove,           // remove from DatanodeDescriptor
@@ -1670,7 +1697,7 @@ public class BlockManager {
       }
     }
     namesystem.chooseExcessReplicates(nonExcess, block, replication, 
-        addedNode, delNodeHint, replicator);
+        addedNode, delNodeHint, blockplacement);
   }
 
   public void addToExcessReplicate(DatanodeInfo dn, Block block) {
@@ -1694,7 +1721,7 @@ public class BlockManager {
    * Modify (block-->datanode) map. Possibly generate replication tasks, if the
    * removed block is still valid.
    */
-  public void removeStoredBlock(Block block, DatanodeDescriptor node) {
+  private void removeStoredBlock(Block block, DatanodeDescriptor node) {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
           + block + " from " + node.getName());
@@ -1878,10 +1905,30 @@ public class BlockManager {
   }
   
   /**
+   * On stopping decommission, check if the node has excess replicas.
+   * If there are any excess replicas, call processOverReplicatedBlock()
+   */
+  private void processOverReplicatedBlocksOnReCommission(
+      final DatanodeDescriptor srcNode) {
+    final Iterator<? extends Block> it = srcNode.getBlockIterator();
+    while(it.hasNext()) {
+      final Block block = it.next();
+      INodeFile fileINode = blocksMap.getINode(block);
+      short expectedReplication = fileINode.getReplication();
+      NumberReplicas num = countNodes(block);
+      int numCurrentReplica = num.liveReplicas();
+      if (numCurrentReplica > expectedReplication) {
+        // over-replicated block 
+        processOverReplicatedBlock(block, expectedReplication, null, null);
+      }
+    }
+  }
+
+  /**
    * Return true if there are any blocks on this node that have not
    * yet reached their replication factor. Otherwise returns false.
    */
-  public boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
+  boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
     boolean status = false;
     int underReplicatedBlocks = 0;
     int decommissionOnlyReplicas = 0;
@@ -2003,7 +2050,7 @@ public class BlockManager {
   }
 
   /** Remove a datanode from the invalidatesSet */
-  public void removeFromInvalidates(String storageID) {
+  private void removeFromInvalidates(String storageID) {
     Collection<Block> blocks = recentInvalidateSets.remove(storageID);
     if (blocks != null) {
       pendingDeletionBlocksCount -= blocks.size();
@@ -2067,28 +2114,6 @@ public class BlockManager {
       namesystem.writeUnlock();
     }
   }
-  
-  //Returns the number of racks over which a given block is replicated
-  //decommissioning/decommissioned nodes are not counted. corrupt replicas 
-  //are also ignored
-  public int getNumberOfRacks(Block b) {
-    HashSet<String> rackSet = new HashSet<String>(0);
-    Collection<DatanodeDescriptor> corruptNodes = 
-                                  corruptReplicas.getNodes(b);
-    for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); 
-         it.hasNext();) {
-      DatanodeDescriptor cur = it.next();
-      if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
-        if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
-          String rackName = cur.getNetworkLocation();
-          if (!rackSet.contains(rackName)) {
-            rackSet.add(rackName);
-          }
-        }
-      }
-    }
-    return rackSet.size();
-  }
 
   boolean blockHasEnoughRacks(Block b) {
     if (!this.shouldCheckForEnoughRacks) {
@@ -2190,4 +2215,118 @@ public class BlockManager {
     return neededReplications
         .iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
   }
+
+  /**
+   * Change, if appropriate, the admin state of a datanode to 
+   * decommission completed. Return true if decommission is complete.
+   */
+  boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
+    // Check to see if all blocks in this decommissioned
+    // node has reached their target replication factor.
+    if (node.isDecommissionInProgress()) {
+      if (!isReplicationInProgress(node)) {
+        node.setDecommissioned();
+        LOG.info("Decommission complete for node " + node.getName());
+      }
+    }
+    return node.isDecommissioned();
+  }
+
+  /** Start decommissioning the specified datanode. */
+  void startDecommission(DatanodeDescriptor node) throws IOException {
+    if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+      LOG.info("Start Decommissioning node " + node.getName() + " with " + 
+          node.numBlocks() +  " blocks.");
+      synchronized (namesystem.heartbeats) {
+        namesystem.updateStats(node, false);
+        node.startDecommission();
+        namesystem.updateStats(node, true);
+      }
+      node.decommissioningStatus.setStartTime(now());
+      
+      // all the blocks that reside on this node have to be replicated.
+      checkDecommissionStateInternal(node);
+    }
+  }
+
+  /** Stop decommissioning the specified datanodes. */
+  void stopDecommission(DatanodeDescriptor node) throws IOException {
+    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      LOG.info("Stop Decommissioning node " + node.getName());
+      synchronized (namesystem.heartbeats) {
+        namesystem.updateStats(node, false);
+        node.stopDecommission();
+        namesystem.updateStats(node, true);
+      }
+      processOverReplicatedBlocksOnReCommission(node);
+    }
+  }
+
+  /**
+   * Periodically calls computeReplicationWork().
+   */
+  private class ReplicationMonitor implements Runnable {
+    static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
+    static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
+
+    @Override
+    public void run() {
+      while (namesystem.isRunning()) {
+        try {
+          computeDatanodeWork();
+          processPendingReplications();
+          Thread.sleep(replicationRecheckInterval);
+        } catch (InterruptedException ie) {
+          LOG.warn("ReplicationMonitor thread received InterruptedException.", ie);
+          break;
+        } catch (IOException ie) {
+          LOG.warn("ReplicationMonitor thread received exception. " , ie);
+        } catch (Throwable t) {
+          LOG.warn("ReplicationMonitor thread received Runtime exception. ", t);
+          Runtime.getRuntime().exit(-1);
+        }
+      }
+    }
+  }
+
+
+  /**
+   * Compute block replication and block invalidation work that can be scheduled
+   * on data-nodes. The datanode will be informed of this work at the next
+   * heartbeat.
+   * 
+   * @return number of blocks scheduled for replication or removal.
+   * @throws IOException
+   */
+  int computeDatanodeWork() throws IOException {
+    int workFound = 0;
+    int blocksToProcess = 0;
+    int nodesToProcess = 0;
+    // Blocks should not be replicated or removed if in safe mode.
+    // It's OK to check safe mode here w/o holding lock, in the worst
+    // case extra replications will be scheduled, and these will get
+    // fixed up later.
+    if (namesystem.isInSafeMode())
+      return workFound;
+
+    synchronized (namesystem.heartbeats) {
+      blocksToProcess = (int) (namesystem.heartbeats.size() * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
+      nodesToProcess = (int) Math.ceil((double) namesystem.heartbeats.size()
+          * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
+    }
+
+    workFound = this.computeReplicationWork(blocksToProcess);
+
+    // Update FSNamesystemMetrics counters
+    namesystem.writeLock();
+    try {
+      this.updateState();
+      this.scheduledReplicationBlocksCount = workFound;
+    } finally {
+      namesystem.writeUnlock();
+    }
+    workFound += this.computeInvalidateWork(nodesToProcess);
+    return workFound;
+  }
+
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Tue Jul 26 01:53:10 2011
@@ -18,8 +18,14 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,11 +34,23 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Manage datanodes, include decommission and other activities.
@@ -49,9 +67,29 @@ public class DatanodeManager {
 
   /** Host names to datanode descriptors mapping. */
   private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
+
+  private final DNSToSwitchMapping dnsToSwitchMapping;
+
+  /** Read include/exclude files*/
+  private final HostsFileReader hostsReader; 
   
-  DatanodeManager(final FSNamesystem namesystem) {
+  DatanodeManager(final FSNamesystem namesystem, final Configuration conf
+      ) throws IOException {
     this.namesystem = namesystem;
+    this.hostsReader = new HostsFileReader(
+        conf.get(DFSConfigKeys.DFS_HOSTS, ""),
+        conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+
+    this.dnsToSwitchMapping = ReflectionUtils.newInstance(
+        conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
+            ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
+    
+    // If the dns to switch mapping supports cache, resolve network
+    // locations of those hosts in the include list and store the mapping
+    // in the cache; so future calls to resolve will be fast.
+    if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
+      dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
+    }
   }
 
   private Daemon decommissionthread = null;
@@ -93,7 +131,7 @@ public class DatanodeManager {
   }
 
   /** Add a datanode. */
-  public void addDatanode(final DatanodeDescriptor node) {
+  private void addDatanode(final DatanodeDescriptor node) {
     // To keep host2DatanodeMap consistent with datanodeMap,
     // remove  from host2DatanodeMap the datanodeDescriptor removed
     // from datanodeMap before adding node to host2DatanodeMap.
@@ -112,7 +150,7 @@ public class DatanodeManager {
   }
 
   /** Physically remove node from datanodeMap. */
-  public void wipeDatanode(final DatanodeID node) throws IOException {
+  private void wipeDatanode(final DatanodeID node) throws IOException {
     final String key = node.getStorageID();
     synchronized (namesystem.datanodeMap) {
       host2DatanodeMap.remove(namesystem.datanodeMap.remove(key));
@@ -123,4 +161,380 @@ public class DatanodeManager {
           + " is removed from datanodeMap.");
     }
   }
+
+  /* Resolve a node's network location */
+  private void resolveNetworkLocation (DatanodeDescriptor node) {
+    List<String> names = new ArrayList<String>(1);
+    if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
+      // get the node's IP address
+      names.add(node.getHost());
+    } else {
+      // get the node's host name
+      String hostName = node.getHostName();
+      int colon = hostName.indexOf(":");
+      hostName = (colon==-1)?hostName:hostName.substring(0,colon);
+      names.add(hostName);
+    }
+    
+    // resolve its network location
+    List<String> rName = dnsToSwitchMapping.resolve(names);
+    String networkLocation;
+    if (rName == null) {
+      LOG.error("The resolve call returned null! Using " + 
+          NetworkTopology.DEFAULT_RACK + " for host " + names);
+      networkLocation = NetworkTopology.DEFAULT_RACK;
+    } else {
+      networkLocation = rName.get(0);
+    }
+    node.setNetworkLocation(networkLocation);
+  }
+
+  private boolean inHostsList(DatanodeID node, String ipAddr) {
+     return checkInList(node, ipAddr, hostsReader.getHosts(), false);
+  }
+  
+  private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
+    return checkInList(node, ipAddr, hostsReader.getExcludedHosts(), true);
+  }
+
+  /**
+   * Remove an already decommissioned data node who is neither in include nor
+   * exclude hosts lists from the the list of live or dead nodes.  This is used
+   * to not display an already decommssioned data node to the operators.
+   * The operation procedure of making a already decommissioned data node not
+   * to be displayed is as following:
+   * <ol>
+   *   <li> 
+   *   Host must have been in the include hosts list and the include hosts list
+   *   must not be empty.
+   *   </li>
+   *   <li>
+   *   Host is decommissioned by remaining in the include hosts list and added
+   *   into the exclude hosts list. Name node is updated with the new 
+   *   information by issuing dfsadmin -refreshNodes command.
+   *   </li>
+   *   <li>
+   *   Host is removed from both include hosts and exclude hosts lists.  Name 
+   *   node is updated with the new informationby issuing dfsamin -refreshNodes 
+   *   command.
+   *   <li>
+   * </ol>
+   * 
+   * @param nodeList
+   *          , array list of live or dead nodes.
+   */
+  public void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
+    // If the include list is empty, any nodes are welcomed and it does not
+    // make sense to exclude any nodes from the cluster. Therefore, no remove.
+    if (hostsReader.getHosts().isEmpty()) {
+      return;
+    }
+    
+    for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
+      DatanodeDescriptor node = it.next();
+      if ((!inHostsList(node, null)) && (!inExcludedHostsList(node, null))
+          && node.isDecommissioned()) {
+        // Include list is not empty, an existing datanode does not appear
+        // in both include or exclude lists and it has been decommissioned.
+        // Remove it from the node list.
+        it.remove();
+      }
+    }
+  }
+
+  /**
+   * Check if the given node (of DatanodeID or ipAddress) is in the (include or
+   * exclude) list.  If ipAddress in null, check only based upon the given 
+   * DatanodeID.  If ipAddress is not null, the ipAddress should refers to the
+   * same host that given DatanodeID refers to.
+   * 
+   * @param node, the host DatanodeID
+   * @param ipAddress, if not null, should refers to the same host
+   *                   that DatanodeID refers to
+   * @param hostsList, the list of hosts in the include/exclude file
+   * @param isExcludeList, boolean, true if this is the exclude list
+   * @return boolean, if in the list
+   */
+  private static boolean checkInList(final DatanodeID node,
+      final String ipAddress,
+      final Set<String> hostsList,
+      final boolean isExcludeList) {
+    final InetAddress iaddr;
+    if (ipAddress != null) {
+      try {
+        iaddr = InetAddress.getByName(ipAddress);
+      } catch (UnknownHostException e) {
+        LOG.warn("Unknown ip address: " + ipAddress, e);
+        return isExcludeList;
+      }
+    } else {
+      try {
+        iaddr = InetAddress.getByName(node.getHost());
+      } catch (UnknownHostException e) {
+        LOG.warn("Unknown host: " + node.getHost(), e);
+        return isExcludeList;
+      }
+    }
+
+    // if include list is empty, host is in include list
+    if ( (!isExcludeList) && (hostsList.isEmpty()) ){
+      return true;
+    }
+    return // compare ipaddress(:port)
+    (hostsList.contains(iaddr.getHostAddress().toString()))
+        || (hostsList.contains(iaddr.getHostAddress().toString() + ":"
+            + node.getPort()))
+        // compare hostname(:port)
+        || (hostsList.contains(iaddr.getHostName()))
+        || (hostsList.contains(iaddr.getHostName() + ":" + node.getPort()))
+        || ((node instanceof DatanodeInfo) && hostsList
+            .contains(((DatanodeInfo) node).getHostName()));
+  }
+
+  /**
+   * Decommission the node if it is in exclude list.
+   */
+  private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) 
+    throws IOException {
+    // If the registered node is in exclude list, then decommission it
+    if (inExcludedHostsList(nodeReg, ipAddr)) {
+      namesystem.getBlockManager().startDecommission(nodeReg);
+    }
+  }
+
+  
+  /**
+   * Generate new storage ID.
+   * 
+   * @return unique storage ID
+   * 
+   * Note: that collisions are still possible if somebody will try 
+   * to bring in a data storage from a different cluster.
+   */
+  private String newStorageID() {
+    String newID = null;
+    while(newID == null) {
+      newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
+      if (namesystem.datanodeMap.get(newID) != null)
+        newID = null;
+    }
+    return newID;
+  }
+
+  public void registerDatanode(DatanodeRegistration nodeReg
+      ) throws IOException {
+    String dnAddress = Server.getRemoteAddress();
+    if (dnAddress == null) {
+      // Mostly called inside an RPC.
+      // But if not, use address passed by the data-node.
+      dnAddress = nodeReg.getHost();
+    }      
+
+    // Checks if the node is not on the hosts list.  If it is not, then
+    // it will be disallowed from registering. 
+    if (!inHostsList(nodeReg, dnAddress)) {
+      throw new DisallowedDatanodeException(nodeReg);
+    }
+
+    String hostName = nodeReg.getHost();
+      
+    // update the datanode's name with ip:port
+    DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
+                                      nodeReg.getStorageID(),
+                                      nodeReg.getInfoPort(),
+                                      nodeReg.getIpcPort());
+    nodeReg.updateRegInfo(dnReg);
+    nodeReg.exportedKeys = namesystem.getBlockKeys();
+      
+    NameNode.stateChangeLog.info("BLOCK* NameSystem.registerDatanode: "
+        + "node registration from " + nodeReg.getName()
+        + " storage " + nodeReg.getStorageID());
+
+    DatanodeDescriptor nodeS = namesystem.datanodeMap.get(nodeReg.getStorageID());
+    DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getName());
+      
+    if (nodeN != null && nodeN != nodeS) {
+      NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
+                        + "node from name: " + nodeN.getName());
+      // nodeN previously served a different data storage, 
+      // which is not served by anybody anymore.
+      namesystem.removeDatanode(nodeN);
+      // physically remove node from datanodeMap
+      wipeDatanode(nodeN);
+      nodeN = null;
+    }
+
+    if (nodeS != null) {
+      if (nodeN == nodeS) {
+        // The same datanode has been just restarted to serve the same data 
+        // storage. We do not need to remove old data blocks, the delta will
+        // be calculated on the next block report from the datanode
+        if(NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
+                                        + "node restarted.");
+        }
+      } else {
+        // nodeS is found
+        /* The registering datanode is a replacement node for the existing 
+          data storage, which from now on will be served by a new node.
+          If this message repeats, both nodes might have same storageID 
+          by (insanely rare) random chance. User needs to restart one of the
+          nodes with its data cleared (or user can just remove the StorageID
+          value in "VERSION" file under the data directory of the datanode,
+          but this is might not work if VERSION file format has changed 
+       */        
+        NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
+                                      + "node " + nodeS.getName()
+                                      + " is replaced by " + nodeReg.getName() + 
+                                      " with the same storageID " +
+                                      nodeReg.getStorageID());
+      }
+      // update cluster map
+      getNetworkTopology().remove(nodeS);
+      nodeS.updateRegInfo(nodeReg);
+      nodeS.setHostName(hostName);
+      nodeS.setDisallowed(false); // Node is in the include list
+      
+      // resolve network location
+      resolveNetworkLocation(nodeS);
+      getNetworkTopology().add(nodeS);
+        
+      // also treat the registration message as a heartbeat
+      synchronized(namesystem.heartbeats) {
+        if( !namesystem.heartbeats.contains(nodeS)) {
+          namesystem.heartbeats.add(nodeS);
+          //update its timestamp
+          nodeS.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
+          nodeS.isAlive = true;
+        }
+      }
+      checkDecommissioning(nodeS, dnAddress);
+      return;
+    } 
+
+    // this is a new datanode serving a new data storage
+    if (nodeReg.getStorageID().equals("")) {
+      // this data storage has never been registered
+      // it is either empty or was created by pre-storageID version of DFS
+      nodeReg.storageID = newStorageID();
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug(
+            "BLOCK* NameSystem.registerDatanode: "
+            + "new storageID " + nodeReg.getStorageID() + " assigned.");
+      }
+    }
+    // register new datanode
+    DatanodeDescriptor nodeDescr 
+      = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
+    resolveNetworkLocation(nodeDescr);
+    addDatanode(nodeDescr);
+    checkDecommissioning(nodeDescr, dnAddress);
+    
+    // also treat the registration message as a heartbeat
+    synchronized(namesystem.heartbeats) {
+      namesystem.heartbeats.add(nodeDescr);
+      nodeDescr.isAlive = true;
+      // no need to update its timestamp
+      // because its is done when the descriptor is created
+    }
+  }
+
+  /** Reread include/exclude files. */
+  public void refreshHostsReader(Configuration conf) throws IOException {
+    // Reread the conf to get dfs.hosts and dfs.hosts.exclude filenames.
+    // Update the file names and refresh internal includes and excludes list.
+    if (conf == null) {
+      conf = new HdfsConfiguration();
+    }
+    hostsReader.updateFileNames(conf.get(DFSConfigKeys.DFS_HOSTS, ""), 
+                                conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+    hostsReader.refresh();
+  }
+  
+  /**
+   * Rereads the config to get hosts and exclude list file names.
+   * Rereads the files to update the hosts and exclude lists.  It
+   * checks if any of the hosts have changed states:
+   * 1. Added to hosts  --> no further work needed here.
+   * 2. Removed from hosts --> mark AdminState as decommissioned. 
+   * 3. Added to exclude --> start decommission.
+   * 4. Removed from exclude --> stop decommission.
+   */
+  public void refreshDatanodes() throws IOException {
+    for(DatanodeDescriptor node : namesystem.datanodeMap.values()) {
+      // Check if not include.
+      if (!inHostsList(node, null)) {
+        node.setDisallowed(true);  // case 2.
+      } else {
+        if (inExcludedHostsList(node, null)) {
+          namesystem.getBlockManager().startDecommission(node);   // case 3.
+        } else {
+          namesystem.getBlockManager().stopDecommission(node);   // case 4.
+        }
+      }
+    }
+  }
+
+  /** For generating datanode reports */
+  public List<DatanodeDescriptor> getDatanodeListForReport(
+      final DatanodeReportType type) {
+    boolean listLiveNodes = type == DatanodeReportType.ALL ||
+                            type == DatanodeReportType.LIVE;
+    boolean listDeadNodes = type == DatanodeReportType.ALL ||
+                            type == DatanodeReportType.DEAD;
+
+    HashMap<String, String> mustList = new HashMap<String, String>();
+
+    if (listDeadNodes) {
+      //first load all the nodes listed in include and exclude files.
+      Iterator<String> it = hostsReader.getHosts().iterator();
+      while (it.hasNext()) {
+        mustList.put(it.next(), "");
+      }
+      it = hostsReader.getExcludedHosts().iterator(); 
+      while (it.hasNext()) {
+        mustList.put(it.next(), "");
+      }
+    }
+
+    ArrayList<DatanodeDescriptor> nodes = null;
+    
+    synchronized (namesystem.datanodeMap) {
+      nodes = new ArrayList<DatanodeDescriptor>(namesystem.datanodeMap.size() + 
+                                                mustList.size());
+      Iterator<DatanodeDescriptor> it = namesystem.datanodeMap.values().iterator();
+      while (it.hasNext()) { 
+        DatanodeDescriptor dn = it.next();
+        boolean isDead = namesystem.isDatanodeDead(dn);
+        if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
+          nodes.add(dn);
+        }
+        //Remove any form of the this datanode in include/exclude lists.
+        try {
+          InetAddress inet = InetAddress.getByName(dn.getHost());
+          // compare hostname(:port)
+          mustList.remove(inet.getHostName());
+          mustList.remove(inet.getHostName()+":"+dn.getPort());
+          // compare ipaddress(:port)
+          mustList.remove(inet.getHostAddress().toString());
+          mustList.remove(inet.getHostAddress().toString()+ ":" +dn.getPort());
+        } catch ( UnknownHostException e ) {
+          mustList.remove(dn.getName());
+          mustList.remove(dn.getHost());
+          LOG.warn(e);
+        }
+      }
+    }
+    
+    if (listDeadNodes) {
+      Iterator<String> it = mustList.keySet().iterator();
+      while (it.hasNext()) {
+        DatanodeDescriptor dn = 
+            new DatanodeDescriptor(new DatanodeID(it.next()));
+        dn.setLastUpdate(0);
+        nodes.add(dn);
+      }
+    }
+    return nodes;
+  }
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java Tue Jul 26 01:53:10 2011
@@ -35,9 +35,11 @@ class DecommissionManager {
   static final Log LOG = LogFactory.getLog(DecommissionManager.class);
 
   private final FSNamesystem fsnamesystem;
+  private final BlockManager blockManager;
 
   DecommissionManager(FSNamesystem namesystem) {
     this.fsnamesystem = namesystem;
+    this.blockManager = fsnamesystem.getBlockManager();
   }
 
   /** Periodically check decommission status. */
@@ -88,7 +90,7 @@ class DecommissionManager {
 
         if (d.isDecommissionInProgress()) {
           try {
-            fsnamesystem.checkDecommissionStateInternal(d);
+            blockManager.checkDecommissionStateInternal(d);
           } catch(Exception e) {
             LOG.warn("entry=" + entry, e);
           }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Tue Jul 26 01:53:10 2011
@@ -51,7 +51,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 import org.apache.hadoop.http.HtmlQuoting;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
@@ -487,8 +487,8 @@ public class JspHelper {
     if (namenodeAddressInUrl != null) {
       namenodeAddress = DFSUtil.getSocketAddress(namenodeAddressInUrl);
     } else if (context != null) {
-      namenodeAddress = (InetSocketAddress) context
-          .getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
+      namenodeAddress = NameNodeHttpServer.getNameNodeAddressFromContext(
+          context); 
     }
     if (namenodeAddress != null) {
       return (namenodeAddress.getAddress().getHostAddress() + ":" 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Jul 26 01:53:10 2011
@@ -99,23 +99,16 @@ public class FSDataset implements FSCons
         }
       } else {
         File[] files = FileUtil.listFiles(dir); 
-        int numChildren = 0;
+        List<FSDir> dirList = new ArrayList<FSDir>();
         for (int idx = 0; idx < files.length; idx++) {
           if (files[idx].isDirectory()) {
-            numChildren++;
+            dirList.add(new FSDir(files[idx]));
           } else if (Block.isBlockFilename(files[idx])) {
             numBlocks++;
           }
         }
-        if (numChildren > 0) {
-          children = new FSDir[numChildren];
-          int curdir = 0;
-          for (int idx = 0; idx < files.length; idx++) {
-            if (files[idx].isDirectory()) {
-              children[curdir] = new FSDir(files[idx]);
-              curdir++;
-            }
-          }
+        if (dirList.size() > 0) {
+          children = dirList.toArray(new FSDir[dirList.size()]);
         }
       }
     }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java Tue Jul 26 01:53:10 2011
@@ -41,13 +41,24 @@ public class RoundRobinVolumesPolicy imp
     }
     
     int startVolume = curVolume;
+    long maxAvailable = 0;
     
     while (true) {
       FSVolume volume = volumes.get(curVolume);
       curVolume = (curVolume + 1) % volumes.size();
-      if (volume.getAvailable() > blockSize) { return volume; }
+      long availableVolumeSize = volume.getAvailable();
+      if (availableVolumeSize > blockSize) { return volume; }
+      
+      if (availableVolumeSize > maxAvailable) {
+        maxAvailable = availableVolumeSize;
+      }
+      
       if (curVolume == startVolume) {
-        throw new DiskOutOfSpaceException("Insufficient space for an additional block");
+        throw new DiskOutOfSpaceException(
+            "Insufficient space for an additional block. Volume with the most available space has "
+                + maxAvailable
+                + " bytes free, configured block size is "
+                + blockSize);
       }
     }
   }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Tue Jul 26 01:53:10 2011
@@ -110,10 +110,10 @@ public class BackupNode extends NameNode
     String addr = conf.get(BN_HTTP_ADDRESS_NAME_KEY, BN_HTTP_ADDRESS_DEFAULT);
     return NetUtils.createSocketAddr(addr);
   }
-
+  
   @Override // NameNode
   protected void setHttpServerAddress(Configuration conf){
-    conf.set(BN_HTTP_ADDRESS_NAME_KEY, getHostPortString(httpAddress));
+    conf.set(BN_HTTP_ADDRESS_NAME_KEY, getHostPortString(getHttpAddress()));
   }
 
   @Override // NameNode

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java Tue Jul 26 01:53:10 2011
@@ -28,7 +28,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
@@ -46,8 +45,7 @@ public class CancelDelegationTokenServle
       throws ServletException, IOException {
     final UserGroupInformation ugi;
     final ServletContext context = getServletContext();
-    final Configuration conf = 
-      (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+    final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
     try {
       ugi = getUGI(req, conf);
     } catch(IOException ioe) {
@@ -57,7 +55,8 @@ public class CancelDelegationTokenServle
           "Unable to identify or authenticate user");
       return;
     }
-    final NameNode nn = (NameNode) context.getAttribute("name.node");
+    final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
+        context);
     String tokenString = req.getParameter(TOKEN);
     if (tokenString == null) {
       resp.sendError(HttpServletResponse.SC_MULTIPLE_CHOICES,

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Tue Jul 26 01:53:10 2011
@@ -105,11 +105,6 @@ class Checkpointer extends Daemon {
     String fullInfoAddr = conf.get(DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, 
                                    DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT);
     infoBindAddress = fullInfoAddr.substring(0, fullInfoAddr.indexOf(":"));
-    
-    HttpServer httpServer = backupNode.httpServer;
-    httpServer.setAttribute("name.system.image", getFSImage());
-    httpServer.setAttribute("name.conf", conf);
-    httpServer.addInternalServlet("getimage", "/getimage", GetImageServlet.class);
 
     LOG.info("Checkpoint Period : " + checkpointPeriod + " secs " +
              "(" + checkpointPeriod/60 + " min)");

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java Tue Jul 26 01:53:10 2011
@@ -75,13 +75,14 @@ abstract class DfsServlet extends HttpSe
     ServletContext context = getServletContext();
     // if we are running in the Name Node, use it directly rather than via 
     // rpc
-    NameNode nn = (NameNode) context.getAttribute("name.node");
+    NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
     if (nn != null) {
       return nn;
     }
-    InetSocketAddress nnAddr = (InetSocketAddress)context.getAttribute("name.node.address");
+    InetSocketAddress nnAddr =
+      NameNodeHttpServer.getNameNodeAddressFromContext(context);
     Configuration conf = new HdfsConfiguration(
-        (Configuration)context.getAttribute(JspHelper.CURRENT_CONF));
+        NameNodeHttpServer.getConfFromContext(context));
     return DFSUtil.createNamenode(nnAddr, conf);
   }
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Jul 26 01:53:10 2011
@@ -152,7 +152,7 @@ public class FSDirectory implements Clos
   }
 
   private BlockManager getBlockManager() {
-    return getFSNamesystem().blockManager;
+    return getFSNamesystem().getBlockManager();
   }
 
   /**