You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/26 03:53:21 UTC

svn commit: r1150969 [2/3] - in /hadoop/common/branches/HDFS-1073/hdfs: ./ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/java/or...

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jul 26 01:53:10 2011
@@ -31,7 +31,6 @@ import java.io.PrintWriter;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.URI;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -102,6 +101,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
@@ -135,11 +135,8 @@ import org.apache.hadoop.metrics2.annota
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableCounterInt;
 import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.net.CachedDNSToSwitchMapping;
-import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -147,8 +144,6 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.HostsFileReader;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
@@ -240,7 +235,7 @@ public class FSNamesystem implements FSC
   // Stores the correct file name hierarchy
   //
   public FSDirectory dir;
-  BlockManager blockManager;
+  private BlockManager blockManager;
   
   // Block pool ID used by this namenode
   String blockPoolId;
@@ -287,7 +282,7 @@ public class FSNamesystem implements FSC
   Daemon hbthread = null;   // HeartbeatMonitor thread
   public Daemon lmthread = null;   // LeaseMonitor thread
   Daemon smmthread = null;  // SafeModeMonitor thread
-  public Daemon replthread = null;  // Replication thread
+  
   Daemon nnrmthread = null; // NamenodeResourceMonitor thread
 
   private volatile boolean hasResourcesAvailable = false;
@@ -299,8 +294,6 @@ public class FSNamesystem implements FSC
   // heartbeatExpireInterval is how long namenode waits for datanode to report
   // heartbeat
   private long heartbeatExpireInterval;
-  //replicationRecheckInterval is how often namenode checks for new replication work
-  private long replicationRecheckInterval;
 
   //resourceRecheckInterval is how often namenode checks for the disk space availability
   private long resourceRecheckInterval;
@@ -315,10 +308,6 @@ public class FSNamesystem implements FSC
       ReplaceDatanodeOnFailure.DEFAULT;
 
   private volatile SafeModeInfo safeMode;  // safe mode information
-    
-  private DNSToSwitchMapping dnsToSwitchMapping;
-
-  private HostsFileReader hostsReader; 
 
   private long maxFsObjects = 0;          // maximum number of fs objects
 
@@ -377,9 +366,6 @@ public class FSNamesystem implements FSC
       this.dir = new FSDirectory(fsImage, this, conf);
     }
     this.safeMode = new SafeModeInfo(conf);
-    this.hostsReader = new HostsFileReader(
-      conf.get(DFSConfigKeys.DFS_HOSTS,""),
-      conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE,""));
     if (isBlockTokenEnabled) {
       blockTokenSecretManager = new BlockTokenSecretManager(true,
           blockKeyUpdateInterval, blockTokenLifetime);
@@ -400,27 +386,13 @@ public class FSNamesystem implements FSC
     blockManager.activate(conf);
     this.hbthread = new Daemon(new HeartbeatMonitor());
     this.lmthread = new Daemon(leaseManager.new Monitor());
-    this.replthread = new Daemon(new ReplicationMonitor());
+    
     hbthread.start();
     lmthread.start();
-    replthread.start();
 
     this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
     nnrmthread.start();
 
-    this.dnsToSwitchMapping = ReflectionUtils.newInstance(
-        conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
-                      ScriptBasedMapping.class,
-            DNSToSwitchMapping.class), conf);
-    
-    /* If the dns to switch mapping supports cache, resolve network
-     * locations of those hosts in the include list, 
-     * and store the mapping in the cache; so future calls to resolve
-     * will be fast.
-     */
-    if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
-      dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
-    }
     registerMXBean();
     DefaultMetricsSystem.instance().register(this);
   }
@@ -551,9 +523,7 @@ public class FSNamesystem implements FSC
         DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
     this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
       10 * heartbeatInterval;
-    this.replicationRecheckInterval = 
-      conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
-                  DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
+    
     this.serverDefaults = new FsServerDefaults(
         conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE),
         conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BYTES_PER_CHECKSUM),
@@ -622,7 +592,6 @@ public class FSNamesystem implements FSC
     try {
       if (blockManager != null) blockManager.close();
       if (hbthread != null) hbthread.interrupt();
-      if (replthread != null) replthread.interrupt();
       if (smmthread != null) smmthread.interrupt();
       if (dtSecretManager != null) dtSecretManager.stopThreads();
       if (nnrmthread != null) nnrmthread.interrupt();
@@ -770,7 +739,7 @@ public class FSNamesystem implements FSC
    * 
    * @return current access keys
    */
-  ExportedBlockKeys getBlockKeys() {
+  public ExportedBlockKeys getBlockKeys() {
     return isBlockTokenEnabled ? blockTokenSecretManager.exportKeys()
         : ExportedBlockKeys.DUMMY_KEYS;
   }
@@ -1838,8 +1807,8 @@ public class FSNamesystem implements FSC
     }
 
     // choose new datanodes.
-    final DatanodeInfo[] targets = blockManager.replicator.chooseTarget(
-        src, numAdditionalNodes, clientnode, chosen, true,
+    final DatanodeInfo[] targets = blockManager.getBlockPlacementPolicy(
+        ).chooseTarget(src, numAdditionalNodes, clientnode, chosen, true,
         excludes, preferredblocksize);
     final LocatedBlock lb = new LocatedBlock(blk, targets);
     if (isBlockTokenEnabled) {
@@ -2782,162 +2751,12 @@ public class FSNamesystem implements FSC
       throws IOException {
     writeLock();
     try {
-      registerDatanodeInternal(nodeReg);
+      getBlockManager().getDatanodeManager().registerDatanode(nodeReg);
+      checkSafeMode();
     } finally {
       writeUnlock();
     }
   }
-
-  /** @see #registerDatanode(DatanodeRegistration) */
-  public void registerDatanodeInternal(DatanodeRegistration nodeReg)
-      throws IOException {
-    assert hasWriteLock();
-    String dnAddress = Server.getRemoteAddress();
-    if (dnAddress == null) {
-      // Mostly called inside an RPC.
-      // But if not, use address passed by the data-node.
-      dnAddress = nodeReg.getHost();
-    }      
-
-    // check if the datanode is allowed to be connect to the namenode
-    if (!verifyNodeRegistration(nodeReg, dnAddress)) {
-      throw new DisallowedDatanodeException(nodeReg);
-    }
-
-    String hostName = nodeReg.getHost();
-      
-    // update the datanode's name with ip:port
-    DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
-                                      nodeReg.getStorageID(),
-                                      nodeReg.getInfoPort(),
-                                      nodeReg.getIpcPort());
-    nodeReg.updateRegInfo(dnReg);
-    nodeReg.exportedKeys = getBlockKeys();
-      
-    NameNode.stateChangeLog.info(
-                                 "BLOCK* NameSystem.registerDatanode: "
-                                 + "node registration from " + nodeReg.getName()
-                                 + " storage " + nodeReg.getStorageID());
-
-    DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
-    DatanodeDescriptor nodeN =
-        blockManager.getDatanodeManager().getDatanodeByHost(nodeReg.getName());
-      
-    if (nodeN != null && nodeN != nodeS) {
-      NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
-                        + "node from name: " + nodeN.getName());
-      // nodeN previously served a different data storage, 
-      // which is not served by anybody anymore.
-      removeDatanode(nodeN);
-      // physically remove node from datanodeMap
-      blockManager.getDatanodeManager().wipeDatanode(nodeN);
-      nodeN = null;
-    }
-
-    if (nodeS != null) {
-      if (nodeN == nodeS) {
-        // The same datanode has been just restarted to serve the same data 
-        // storage. We do not need to remove old data blocks, the delta will
-        // be calculated on the next block report from the datanode
-        if(NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
-                                        + "node restarted.");
-        }
-      } else {
-        // nodeS is found
-        /* The registering datanode is a replacement node for the existing 
-          data storage, which from now on will be served by a new node.
-          If this message repeats, both nodes might have same storageID 
-          by (insanely rare) random chance. User needs to restart one of the
-          nodes with its data cleared (or user can just remove the StorageID
-          value in "VERSION" file under the data directory of the datanode,
-          but this is might not work if VERSION file format has changed 
-       */        
-        NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
-                                      + "node " + nodeS.getName()
-                                      + " is replaced by " + nodeReg.getName() + 
-                                      " with the same storageID " +
-                                      nodeReg.getStorageID());
-      }
-      // update cluster map
-      blockManager.getDatanodeManager().getNetworkTopology().remove(nodeS);
-      nodeS.updateRegInfo(nodeReg);
-      nodeS.setHostName(hostName);
-      nodeS.setDisallowed(false); // Node is in the include list
-      
-      // resolve network location
-      resolveNetworkLocation(nodeS);
-      blockManager.getDatanodeManager().getNetworkTopology().add(nodeS);
-        
-      // also treat the registration message as a heartbeat
-      synchronized(heartbeats) {
-        if( !heartbeats.contains(nodeS)) {
-          heartbeats.add(nodeS);
-          //update its timestamp
-          nodeS.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
-          nodeS.isAlive = true;
-        }
-      }
-      checkDecommissioning(nodeS, dnAddress);
-      return;
-    } 
-
-    // this is a new datanode serving a new data storage
-    if (nodeReg.getStorageID().equals("")) {
-      // this data storage has never been registered
-      // it is either empty or was created by pre-storageID version of DFS
-      nodeReg.storageID = newStorageID();
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug(
-            "BLOCK* NameSystem.registerDatanode: "
-            + "new storageID " + nodeReg.getStorageID() + " assigned.");
-      }
-    }
-    // register new datanode
-    DatanodeDescriptor nodeDescr 
-      = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
-    resolveNetworkLocation(nodeDescr);
-    blockManager.getDatanodeManager().addDatanode(nodeDescr);
-    checkDecommissioning(nodeDescr, dnAddress);
-    
-    // also treat the registration message as a heartbeat
-    synchronized(heartbeats) {
-      heartbeats.add(nodeDescr);
-      nodeDescr.isAlive = true;
-      // no need to update its timestamp
-      // because its is done when the descriptor is created
-    }
-
-    checkSafeMode();
-  }
-    
-  /* Resolve a node's network location */
-  private void resolveNetworkLocation (DatanodeDescriptor node) {
-    assert hasWriteLock();
-    List<String> names = new ArrayList<String>(1);
-    if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
-      // get the node's IP address
-      names.add(node.getHost());
-    } else {
-      // get the node's host name
-      String hostName = node.getHostName();
-      int colon = hostName.indexOf(":");
-      hostName = (colon==-1)?hostName:hostName.substring(0,colon);
-      names.add(hostName);
-    }
-    
-    // resolve its network location
-    List<String> rName = dnsToSwitchMapping.resolve(names);
-    String networkLocation;
-    if (rName == null) {
-      LOG.error("The resolve call returned null! Using " + 
-          NetworkTopology.DEFAULT_RACK + " for host " + names);
-      networkLocation = NetworkTopology.DEFAULT_RACK;
-    } else {
-      networkLocation = rName.get(0);
-    }
-    node.setNetworkLocation(networkLocation);
-  }
   
   /**
    * Get registrationID for datanodes based on the namespaceID.
@@ -2948,26 +2767,8 @@ public class FSNamesystem implements FSC
   public String getRegistrationID() {
     return Storage.getRegistrationID(dir.fsImage.getStorage());
   }
-    
-  /**
-   * Generate new storage ID.
-   * 
-   * @return unique storage ID
-   * 
-   * Note: that collisions are still possible if somebody will try 
-   * to bring in a data storage from a different cluster.
-   */
-  private String newStorageID() {
-    String newID = null;
-    while(newID == null) {
-      newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
-      if (datanodeMap.get(newID) != null)
-        newID = null;
-    }
-    return newID;
-  }
-    
-  private boolean isDatanodeDead(DatanodeDescriptor node) {
+
+  public boolean isDatanodeDead(DatanodeDescriptor node) {
     return (node.getLastUpdate() <
             (now() - heartbeatExpireInterval));
   }
@@ -3080,7 +2881,7 @@ public class FSNamesystem implements FSC
     return null;
   }
 
-  private void updateStats(DatanodeDescriptor node, boolean isAdded) {
+  public void updateStats(DatanodeDescriptor node, boolean isAdded) {
     //
     // The statistics are protected by the heartbeat lock
     // For decommissioning/decommissioned nodes, only used capacity
@@ -3204,77 +3005,7 @@ public class FSNamesystem implements FSC
     }
   }
 
-  /**
-   * Periodically calls computeReplicationWork().
-   */
-  class ReplicationMonitor implements Runnable {
-    static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
-    static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
-    public void run() {
-      while (fsRunning) {
-        try {
-          computeDatanodeWork();
-          blockManager.processPendingReplications();
-          Thread.sleep(replicationRecheckInterval);
-        } catch (InterruptedException ie) {
-          LOG.warn("ReplicationMonitor thread received InterruptedException." + ie);
-          break;
-        } catch (IOException ie) {
-          LOG.warn("ReplicationMonitor thread received exception. " + ie);
-        } catch (Throwable t) {
-          LOG.warn("ReplicationMonitor thread received Runtime exception. " + t);
-          Runtime.getRuntime().exit(-1);
-        }
-      }
-    }
-  }
-
-  /////////////////////////////////////////////////////////
-  //
-  // These methods are called by the Namenode system, to see
-  // if there is any work for registered datanodes.
-  //
-  /////////////////////////////////////////////////////////
-  /**
-   * Compute block replication and block invalidation work 
-   * that can be scheduled on data-nodes.
-   * The datanode will be informed of this work at the next heartbeat.
-   * 
-   * @return number of blocks scheduled for replication or removal.
-   * @throws IOException
-   */
-  public int computeDatanodeWork() throws IOException {
-    int workFound = 0;
-    int blocksToProcess = 0;
-    int nodesToProcess = 0;
-    // Blocks should not be replicated or removed if in safe mode.
-    // It's OK to check safe mode here w/o holding lock, in the worst
-    // case extra replications will be scheduled, and these will get
-    // fixed up later.
-    if (isInSafeMode())
-      return workFound;
-
-    synchronized (heartbeats) {
-      blocksToProcess = (int)(heartbeats.size() 
-          * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
-      nodesToProcess = (int)Math.ceil((double)heartbeats.size() 
-          * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
-    }
-
-    workFound = blockManager.computeReplicationWork(blocksToProcess);
-    
-    // Update FSNamesystemMetrics counters
-    writeLock();
-    try {
-      blockManager.updateState();
-      blockManager.scheduledReplicationBlocksCount = workFound;
-    } finally {
-      writeUnlock();
-    }
-    workFound += blockManager.computeInvalidateWork(nodesToProcess);
-    return workFound;
-  }
-
+ 
   public void setNodeReplicationLimit(int limit) {
     blockManager.maxReplicationStreams = limit;
   }
@@ -3282,10 +3013,10 @@ public class FSNamesystem implements FSC
   /**
    * Remove a datanode descriptor.
    * @param nodeID datanode ID.
-   * @throws IOException
+   * @throws UnregisteredNodeException 
    */
-  public void removeDatanode(DatanodeID nodeID) 
-    throws IOException {
+  public void removeDatanode(final DatanodeID nodeID
+      ) throws UnregisteredNodeException {
     writeLock();
     try {
       DatanodeDescriptor nodeInfo = getDatanode(nodeID);
@@ -3666,83 +3397,23 @@ public class FSNamesystem implements FSC
   }
 
   int getNumberOfDatanodes(DatanodeReportType type) {
-    return getDatanodeListForReport(type).size(); 
-  }
-
-  private ArrayList<DatanodeDescriptor> getDatanodeListForReport(
-      DatanodeReportType type) {
     readLock();
-    try {    
-      boolean listLiveNodes = type == DatanodeReportType.ALL ||
-                              type == DatanodeReportType.LIVE;
-      boolean listDeadNodes = type == DatanodeReportType.ALL ||
-                              type == DatanodeReportType.DEAD;
-  
-      HashMap<String, String> mustList = new HashMap<String, String>();
-  
-      if (listDeadNodes) {
-        //first load all the nodes listed in include and exclude files.
-        Iterator<String> it = hostsReader.getHosts().iterator();
-        while (it.hasNext()) {
-          mustList.put(it.next(), "");
-        }
-        it = hostsReader.getExcludedHosts().iterator(); 
-        while (it.hasNext()) {
-          mustList.put(it.next(), "");
-        }
-      }
-
-      ArrayList<DatanodeDescriptor> nodes = null;
-      
-      synchronized (datanodeMap) {
-        nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() + 
-                                                  mustList.size());
-        Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
-        while (it.hasNext()) { 
-          DatanodeDescriptor dn = it.next();
-          boolean isDead = isDatanodeDead(dn);
-          if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
-            nodes.add(dn);
-          }
-          //Remove any form of the this datanode in include/exclude lists.
-          try {
-            InetAddress inet = InetAddress.getByName(dn.getHost());
-            // compare hostname(:port)
-            mustList.remove(inet.getHostName());
-            mustList.remove(inet.getHostName()+":"+dn.getPort());
-            // compare ipaddress(:port)
-            mustList.remove(inet.getHostAddress().toString());
-            mustList.remove(inet.getHostAddress().toString()+ ":" +dn.getPort());
-          } catch ( UnknownHostException e ) {
-            mustList.remove(dn.getName());
-            mustList.remove(dn.getHost());
-            LOG.warn(e);
-          }
-        }
-      }
-      
-      if (listDeadNodes) {
-        Iterator<String> it = mustList.keySet().iterator();
-        while (it.hasNext()) {
-          DatanodeDescriptor dn = 
-              new DatanodeDescriptor(new DatanodeID(it.next()));
-          dn.setLastUpdate(0);
-          nodes.add(dn);
-        }
-      }
-      return nodes;
+    try {
+      return getBlockManager().getDatanodeManager().getDatanodeListForReport(
+          type).size(); 
     } finally {
       readUnlock();
     }
   }
 
-  public DatanodeInfo[] datanodeReport( DatanodeReportType type)
-      throws AccessControlException {
+  DatanodeInfo[] datanodeReport(final DatanodeReportType type
+      ) throws AccessControlException {
+    checkSuperuserPrivilege();
     readLock();
     try {
-      checkSuperuserPrivilege();
-  
-      ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
+      final DatanodeManager dm = getBlockManager().getDatanodeManager();      
+      final List<DatanodeDescriptor> results = dm.getDatanodeListForReport(type);
+
       DatanodeInfo[] arr = new DatanodeInfo[results.size()];
       for (int i=0; i<arr.length; i++) {
         arr[i] = new DatanodeInfo(results.get(i));
@@ -3806,8 +3477,8 @@ public class FSNamesystem implements FSC
                                           ArrayList<DatanodeDescriptor> dead) {
     readLock();
     try {
-      ArrayList<DatanodeDescriptor> results = 
-                              getDatanodeListForReport(DatanodeReportType.ALL);    
+      final List<DatanodeDescriptor> results = getBlockManager(
+          ).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.ALL);    
       for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
         DatanodeDescriptor node = it.next();
         if (isDatanodeDead(node))
@@ -3838,43 +3509,6 @@ public class FSNamesystem implements FSC
     }
   }
 
-  /**
-   * Start decommissioning the specified datanode. 
-   */
-  private void startDecommission(DatanodeDescriptor node) 
-    throws IOException {
-    assert hasWriteLock();
-    if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
-      LOG.info("Start Decommissioning node " + node.getName() + " with " + 
-          node.numBlocks() +  " blocks.");
-      synchronized (heartbeats) {
-        updateStats(node, false);
-        node.startDecommission();
-        updateStats(node, true);
-      }
-      node.decommissioningStatus.setStartTime(now());
-      
-      // all the blocks that reside on this node have to be replicated.
-      checkDecommissionStateInternal(node);
-    }
-  }
-
-  /**
-   * Stop decommissioning the specified datanodes.
-   */
-  public void stopDecommission(DatanodeDescriptor node) 
-    throws IOException {
-    assert hasWriteLock();
-    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      LOG.info("Stop Decommissioning node " + node.getName());
-      synchronized (heartbeats) {
-        updateStats(node, false);
-        node.stopDecommission();
-        updateStats(node, true);
-      }
-    }
-  }
-
   public Date getStartTime() {
     return new Date(systemStart); 
   }
@@ -3899,85 +3533,6 @@ public class FSNamesystem implements FSC
     return replication;
   }
     
-  /**
-   * Change, if appropriate, the admin state of a datanode to 
-   * decommission completed. Return true if decommission is complete.
-   */
-  public boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
-    assert hasWriteLock();
-    //
-    // Check to see if all blocks in this decommissioned
-    // node has reached their target replication factor.
-    //
-    if (node.isDecommissionInProgress()) {
-      if (!blockManager.isReplicationInProgress(node)) {
-        node.setDecommissioned();
-        LOG.info("Decommission complete for node " + node.getName());
-      }
-    }
-    return node.isDecommissioned();
-  }
-
-  /** 
-   * Keeps track of which datanodes/ipaddress are allowed to connect to the namenode.
-   */
-  private boolean inHostsList(DatanodeID node, String ipAddr) {
-    Set<String> hostsList = hostsReader.getHosts();
-     return checkInList(node, ipAddr, hostsList, false);
-  }
-  
-  private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
-    Set<String> excludeList = hostsReader.getExcludedHosts();
-    return checkInList(node, ipAddr, excludeList, true);
-  }
-
-
-  /**
-   * Check if the given node (of DatanodeID or ipAddress) is in the (include or 
-   * exclude) list.  If ipAddress in null, check only based upon the given 
-   * DatanodeID.  If ipAddress is not null, the ipAddress should refers to the
-   * same host that given DatanodeID refers to.
-   * 
-   * @param node, DatanodeID, the host DatanodeID
-   * @param ipAddress, if not null, should refers to the same host
-   *                   that DatanodeID refers to
-   * @param hostsList, the list of hosts in the include/exclude file
-   * @param isExcludeList, boolean, true if this is the exclude list
-   * @return boolean, if in the list
-   */
-  private boolean checkInList(DatanodeID node, String ipAddress,
-      Set<String> hostsList, boolean isExcludeList) {
-    InetAddress iaddr = null;
-    try {
-      if (ipAddress != null) {
-        iaddr = InetAddress.getByName(ipAddress);
-      } else {
-        iaddr = InetAddress.getByName(node.getHost());
-      }
-    }catch (UnknownHostException e) {
-      LOG.warn("Unknown host in host list: "+ipAddress);
-      // can't resolve the host name.
-      if (isExcludeList){
-        return true;
-      } else {
-        return false;
-      }
-    }
-
-    // if include list is empty, host is in include list
-    if ( (!isExcludeList) && (hostsList.isEmpty()) ){
-      return true;
-    }
-    return // compare ipaddress(:port)
-    (hostsList.contains(iaddr.getHostAddress().toString()))
-        || (hostsList.contains(iaddr.getHostAddress().toString() + ":"
-            + node.getPort()))
-        // compare hostname(:port)
-        || (hostsList.contains(iaddr.getHostName()))
-        || (hostsList.contains(iaddr.getHostName() + ":" + node.getPort()))
-        || ((node instanceof DatanodeInfo) && hostsList
-            .contains(((DatanodeInfo) node).getHostName()));
-  }
   
   /**
    * Rereads the config to get hosts and exclude list file names.
@@ -3990,29 +3545,10 @@ public class FSNamesystem implements FSC
    */
   public void refreshNodes(Configuration conf) throws IOException {
     checkSuperuserPrivilege();
-    // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
-    // Update the file names and refresh internal includes and excludes list
-    if (conf == null)
-      conf = new HdfsConfiguration();
-    hostsReader.updateFileNames(conf.get(DFSConfigKeys.DFS_HOSTS,""), 
-                                conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
-    hostsReader.refresh();
+    getBlockManager().getDatanodeManager().refreshHostsReader(conf);
     writeLock();
     try {
-      for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
-           it.hasNext();) {
-        DatanodeDescriptor node = it.next();
-        // Check if not include.
-        if (!inHostsList(node, null)) {
-          node.setDisallowed(true);  // case 2.
-        } else {
-          if (inExcludedHostsList(node, null)) {
-            startDecommission(node);   // case 3.
-          } else {
-            stopDecommission(node);   // case 4.
-          }
-        }
-      }
+      getBlockManager().getDatanodeManager().refreshDatanodes();
     } finally {
       writeUnlock();
     }
@@ -4022,27 +3558,7 @@ public class FSNamesystem implements FSC
     checkSuperuserPrivilege();
     getFSImage().finalizeUpgrade();
   }
-
-  /**
-   * Checks if the node is not on the hosts list.  If it is not, then
-   * it will be disallowed from registering. 
-   */
-  private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) {
-    assert hasWriteLock();
-    return inHostsList(nodeReg, ipAddr);
-  }
     
-  /**
-   * Decommission the node if it is in exclude list.
-   */
-  private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) 
-    throws IOException {
-    assert hasWriteLock();
-    // If the registered node is in exclude list, then decommission it
-    if (inExcludedHostsList(nodeReg, ipAddr)) {
-      startDecommission(nodeReg);
-    }
-  }
     
   /**
    * Get data node by storage ID.
@@ -4051,7 +3567,8 @@ public class FSNamesystem implements FSC
    * @return DatanodeDescriptor or null if the node is not found.
    * @throws IOException
    */
-  public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException {
+  public DatanodeDescriptor getDatanode(DatanodeID nodeID
+      ) throws UnregisteredNodeException {
     assert hasReadOrWriteLock();
     UnregisteredNodeException e = null;
     DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
@@ -5395,8 +4912,8 @@ public class FSNamesystem implements FSC
     try {
       ArrayList<DatanodeDescriptor> decommissioningNodes = 
         new ArrayList<DatanodeDescriptor>();
-      ArrayList<DatanodeDescriptor> results = 
-        getDatanodeListForReport(DatanodeReportType.LIVE);
+      final List<DatanodeDescriptor> results = getBlockManager(
+          ).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.LIVE);
       for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
         DatanodeDescriptor node = it.next();
         if (node.isDecommissionInProgress()) {
@@ -5808,50 +5325,9 @@ public class FSNamesystem implements FSC
   public BlockManager getBlockManager() {
     return blockManager;
   }
-
-  /**
-   * Remove an already decommissioned data node who is neither in include nor
-   * exclude hosts lists from the the list of live or dead nodes.  This is used
-   * to not display an already decommssioned data node to the operators.
-   * The operation procedure of making a already decommissioned data node not
-   * to be displayed is as following:
-   * <ol>
-   *   <li> 
-   *   Host must have been in the include hosts list and the include hosts list
-   *   must not be empty.
-   *   </li>
-   *   <li>
-   *   Host is decommissioned by remaining in the include hosts list and added
-   *   into the exclude hosts list. Name node is updated with the new 
-   *   information by issuing dfsadmin -refreshNodes command.
-   *   </li>
-   *   <li>
-   *   Host is removed from both include hosts and exclude hosts lists.  Name 
-   *   node is updated with the new informationby issuing dfsamin -refreshNodes 
-   *   command.
-   *   <li>
-   * </ol>
-   * 
-   * @param nodeList
-   *          , array list of live or dead nodes.
-   */
-  void removeDecomNodeFromList(ArrayList<DatanodeDescriptor> nodeList) {
-    // If the include list is empty, any nodes are welcomed and it does not
-    // make sense to exclude any nodes from the cluster. Therefore, no remove.
-    if (hostsReader.getHosts().isEmpty()) {
-      return;
-    }
-    
-    for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
-      DatanodeDescriptor node = it.next();
-      if ((!inHostsList(node, null)) && (!inExcludedHostsList(node, null))
-          && node.isDecommissioned()) {
-        // Include list is not empty, an existing datanode does not appear
-        // in both include or exclude lists and it has been decommissioned.
-        // Remove it from the node list.
-        it.remove();
-      }
-    }
+  
+  void removeDecomNodeFromList(List<DatanodeDescriptor> nodeList) {
+    getBlockManager().getDatanodeManager().removeDecomNodeFromList(nodeList);
   }
 
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Tue Jul 26 01:53:10 2011
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
-import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
 import org.apache.hadoop.net.NetUtils;
@@ -57,10 +56,10 @@ public class FileChecksumServlets {
     public void doGet(HttpServletRequest request, HttpServletResponse response
         ) throws ServletException, IOException {
       final ServletContext context = getServletContext();
-      final Configuration conf = 
-        (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+      final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
       final UserGroupInformation ugi = getUGI(request, conf);
-      final NameNode namenode = (NameNode)context.getAttribute("name.node");
+      final NameNode namenode = NameNodeHttpServer.getNameNodeFromContext(
+          context);
       final DatanodeID datanode = NamenodeJspHelper.getRandomDatanode(namenode);
       try {
         final URI uri = createRedirectUri("/getFileChecksum", ugi, datanode, 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Tue Jul 26 01:53:10 2011
@@ -65,7 +65,8 @@ public class FileDataServlet extends Dfs
     }
 
     // Add namenode address to the url params
-    NameNode nn = (NameNode)getServletContext().getAttribute("name.node");
+    NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
+        getServletContext());
     String addr = NameNode.getHostPortString(nn.getNameNodeAddress());
     String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
     
@@ -85,7 +86,8 @@ public class FileDataServlet extends Dfs
       throws IOException {
     if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) {
       // pick a random datanode
-      NameNode nn = (NameNode)getServletContext().getAttribute("name.node");
+      NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
+          getServletContext());
       return NamenodeJspHelper.getRandomDatanode(nn);
     }
     return JspHelper.bestNode(blks);
@@ -101,8 +103,8 @@ public class FileDataServlet extends Dfs
   public void doGet(final HttpServletRequest request,
       final HttpServletResponse response)
       throws IOException {
-    final Configuration conf = 
-      (Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
+    final Configuration conf = NameNodeHttpServer.getConfFromContext(
+        getServletContext());
     final UserGroupInformation ugi = getUGI(request, conf);
 
     try {

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java Tue Jul 26 01:53:10 2011
@@ -30,7 +30,6 @@ import javax.servlet.http.HttpServletRes
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -49,17 +48,15 @@ public class FsckServlet extends DfsServ
     final PrintWriter out = response.getWriter();
     final InetAddress remoteAddress = 
       InetAddress.getByName(request.getRemoteAddr());
-    final Configuration conf = 
-      (Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
+    final ServletContext context = getServletContext();    
+    final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
 
     final UserGroupInformation ugi = getUGI(request, conf);
     try {
       ugi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public Object run() throws Exception {
-          final ServletContext context = getServletContext();
-          
-          NameNode nn = (NameNode) context.getAttribute("name.node");
+          NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
           
           final FSNamesystem namesystem = nn.getNamesystem();
           final int totalDatanodes = 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java Tue Jul 26 01:53:10 2011
@@ -29,7 +29,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -49,8 +48,7 @@ public class GetDelegationTokenServlet e
       throws ServletException, IOException {
     final UserGroupInformation ugi;
     final ServletContext context = getServletContext();
-    final Configuration conf = 
-      (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+    final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
     try {
       ugi = getUGI(req, conf);
     } catch(IOException ioe) {
@@ -61,7 +59,7 @@ public class GetDelegationTokenServlet e
       return;
     }
     LOG.info("Sending token: {" + ugi.getUserName() + "," + req.getRemoteAddr() +"}");
-    final NameNode nn = (NameNode) context.getAttribute("name.node");
+    final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
     String renewer = req.getParameter(RENEWER);
     final String renewerFinal = (renewer == null) ? 
         req.getUserPrincipal().getName() : renewer;

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java Tue Jul 26 01:53:10 2011
@@ -70,7 +70,7 @@ public class GetImageServlet extends Htt
                     ) throws ServletException, IOException {
     try {
       ServletContext context = getServletContext();
-      final FSImage nnImage = (FSImage)context.getAttribute("name.system.image");
+      final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
       final GetImageParams parsedParams = new GetImageParams(request, response);
       final Configuration conf = 
         (Configuration)getServletContext().getAttribute(JspHelper.CURRENT_CONF);

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Tue Jul 26 01:53:10 2011
@@ -127,7 +127,7 @@ public class INodeFile extends INode {
       size += in.blocks.length;
     }
     
-    for(BlockInfo bi: this.blocks) {
+    for(BlockInfo bi: newlist) {
       bi.setINode(this);
     }
     this.blocks = newlist;

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Jul 26 01:53:10 2011
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -67,7 +66,6 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
-import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -82,7 +80,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -96,7 +93,6 @@ import org.apache.hadoop.security.Groups
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
@@ -209,8 +205,6 @@ public class NameNode implements Namenod
   public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
   public static final Log stateChangeLog = LogFactory.getLog("org.apache.hadoop.hdfs.StateChange");
   
-  public static final String NAMENODE_ADDRESS_ATTRIBUTE_KEY = "name.node.address";
-
   protected FSNamesystem namesystem; 
   protected NamenodeRole role;
   /** RPC server. Package-protected for use in tests. */
@@ -226,9 +220,7 @@ public class NameNode implements Namenod
   /** RPC server for DN address */
   protected InetSocketAddress serviceRPCAddress = null;
   /** httpServer */
-  protected HttpServer httpServer;
-  /** HTTP server address */
-  protected InetSocketAddress httpAddress = null;
+  protected NameNodeHttpServer httpServer;
   private Thread emptier;
   /** only used for testing purposes  */
   protected boolean stopRequested = false;
@@ -373,9 +365,10 @@ public class NameNode implements Namenod
     return  NetUtils.createSocketAddr(
         conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50070"));
   }
-
-  protected void setHttpServerAddress(Configuration conf){
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, getHostPortString(httpAddress));
+  
+  protected void setHttpServerAddress(Configuration conf) {
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
+        getHostPortString(getHttpAddress()));
   }
 
   protected void loadNamesystem(Configuration conf) throws IOException {
@@ -389,12 +382,21 @@ public class NameNode implements Namenod
   NamenodeRegistration setRegistration() {
     nodeRegistration = new NamenodeRegistration(
         getHostPortString(rpcAddress),
-        getHostPortString(httpAddress),
+        getHostPortString(getHttpAddress()),
         getFSImage().getStorage(), getRole());
     return nodeRegistration;
   }
 
   /**
+   * Login as the configured user for the NameNode.
+   */
+  void loginAsNameNodeUser(Configuration conf) throws IOException {
+    InetSocketAddress socAddr = getRpcServerAddress(conf);
+    SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
+  }
+  
+  /**
    * Initialize name-node.
    * 
    * @param conf the configuration
@@ -402,8 +404,7 @@ public class NameNode implements Namenod
   protected void initialize(Configuration conf) throws IOException {
     InetSocketAddress socAddr = getRpcServerAddress(conf);
     UserGroupInformation.setConfiguration(conf);
-    SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
-        DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
+    loginAsNameNodeUser(conf);
     int handlerCount = 
       conf.getInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 
                   DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT);
@@ -515,108 +516,9 @@ public class NameNode implements Namenod
   }
   
   private void startHttpServer(final Configuration conf) throws IOException {
-    final InetSocketAddress infoSocAddr = getHttpServerAddress(conf);
-    final String infoHost = infoSocAddr.getHostName();
-    if(UserGroupInformation.isSecurityEnabled()) {
-      String httpsUser = SecurityUtil.getServerPrincipal(conf
-          .get(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY), infoHost);
-      if (httpsUser == null) {
-        LOG.warn(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY
-            + " not defined in config. Starting http server as "
-            + SecurityUtil.getServerPrincipal(conf
-                .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), rpcAddress
-                .getHostName())
-            + ": Kerberized SSL may be not function correctly.");
-      } else {
-        // Kerberized SSL servers must be run from the host principal...
-        LOG.info("Logging in as " + httpsUser + " to start http server.");
-        SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
-            DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY, infoHost);
-      }
-    }
-    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
-    try {
-      this.httpServer = ugi.doAs(new PrivilegedExceptionAction<HttpServer>() {
-        @Override
-        public HttpServer run() throws IOException, InterruptedException {
-          int infoPort = infoSocAddr.getPort();
-          httpServer = new HttpServer("hdfs", infoHost, infoPort,
-              infoPort == 0, conf, 
-              new AccessControlList(conf.get(DFSConfigKeys.DFS_ADMIN, " ")));
-
-          boolean certSSL = conf.getBoolean("dfs.https.enable", false);
-          boolean useKrb = UserGroupInformation.isSecurityEnabled();
-          if (certSSL || useKrb) {
-            boolean needClientAuth = conf.getBoolean(
-                DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
-                DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
-            InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf
-                .get(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
-                    DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT));
-            Configuration sslConf = new HdfsConfiguration(false);
-            if (certSSL) {
-              sslConf.addResource(conf.get(
-                  "dfs.https.server.keystore.resource", "ssl-server.xml"));
-            }
-            httpServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth,
-                useKrb);
-            // assume same ssl port for all datanodes
-            InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf
-                .get("dfs.datanode.https.address", infoHost + ":" + 50475));
-            httpServer.setAttribute("datanode.https.port", datanodeSslPort
-                .getPort());
-          }
-          httpServer.setAttribute("name.node", NameNode.this);
-          httpServer.setAttribute(NAMENODE_ADDRESS_ATTRIBUTE_KEY,
-              getNameNodeAddress());
-          httpServer.setAttribute("name.system.image", getFSImage());
-          httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
-          httpServer.addInternalServlet("getDelegationToken",
-              GetDelegationTokenServlet.PATH_SPEC, 
-              GetDelegationTokenServlet.class, true);
-          httpServer.addInternalServlet("renewDelegationToken", 
-              RenewDelegationTokenServlet.PATH_SPEC, 
-              RenewDelegationTokenServlet.class, true);
-          httpServer.addInternalServlet("cancelDelegationToken", 
-              CancelDelegationTokenServlet.PATH_SPEC, 
-              CancelDelegationTokenServlet.class, true);
-          httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
-              true);
-          httpServer.addInternalServlet("getimage", "/getimage",
-              GetImageServlet.class, true);
-          httpServer.addInternalServlet("listPaths", "/listPaths/*",
-              ListPathsServlet.class, false);
-          httpServer.addInternalServlet("data", "/data/*",
-              FileDataServlet.class, false);
-          httpServer.addInternalServlet("checksum", "/fileChecksum/*",
-              FileChecksumServlets.RedirectServlet.class, false);
-          httpServer.addInternalServlet("contentSummary", "/contentSummary/*",
-              ContentSummaryServlet.class, false);
-          httpServer.start();
-
-          // The web-server port can be ephemeral... ensure we have the correct
-          // info
-          infoPort = httpServer.getPort();
-          httpAddress = new InetSocketAddress(infoHost, infoPort);
-          setHttpServerAddress(conf);
-          LOG.info(getRole() + " Web-server up at: " + httpAddress);
-          return httpServer;
-        }
-      });
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    } finally {
-      if(UserGroupInformation.isSecurityEnabled() && 
-          conf.get(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY) != null) {
-        // Go back to being the correct Namenode principal
-        LOG.info("Logging back in as "
-            + SecurityUtil.getServerPrincipal(conf
-                .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), rpcAddress
-                .getHostName()) + " following http server start.");
-        SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
-            DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, rpcAddress.getHostName());
-      }
-    }
+    httpServer = new NameNodeHttpServer(conf, this, getHttpServerAddress(conf));
+    httpServer.start();
+    setHttpServerAddress(conf);
   }
 
   /**
@@ -1390,7 +1292,7 @@ public class NameNode implements Namenod
    * @return the http address.
    */
   public InetSocketAddress getHttpAddress() {
-    return httpAddress;
+    return httpServer.getHttpAddress();
   }
 
   /**

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Tue Jul 26 01:53:10 2011
@@ -385,7 +385,7 @@ class NamenodeJspHelper {
   static void redirectToRandomDataNode(ServletContext context,
       HttpServletRequest request, HttpServletResponse resp) throws IOException,
       InterruptedException {
-    final NameNode nn = (NameNode) context.getAttribute("name.node");
+    final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
     final Configuration conf = (Configuration) context
         .getAttribute(JspHelper.CURRENT_CONF);
     final DatanodeID datanode = getRandomDatanode(nn);
@@ -566,12 +566,12 @@ class NamenodeJspHelper {
         HttpServletRequest request) throws IOException {
       ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
       ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
-      final NameNode nn = (NameNode)context.getAttribute("name.node");
+      final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
       nn.getNamesystem().DFSNodesStatus(live, dead);
       nn.getNamesystem().removeDecomNodeFromList(live);
       nn.getNamesystem().removeDecomNodeFromList(dead);
       InetSocketAddress nnSocketAddress = (InetSocketAddress) context
-          .getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
+          .getAttribute(NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
       String nnaddr = nnSocketAddress.getAddress().getHostAddress() + ":"
           + nnSocketAddress.getPort();
 
@@ -724,7 +724,7 @@ class NamenodeJspHelper {
         this.inode = null;
       } else {
         this.block = new Block(blockId);
-        this.inode = fsn.blockManager.getINode(block);
+        this.inode = fsn.getBlockManager().getINode(block);
       }
     }
 
@@ -799,9 +799,9 @@ class NamenodeJspHelper {
 
         doc.startTag("replicas");
        
-        if (fsn.blockManager.blocksMap.contains(block)) {
+        if (fsn.getBlockManager().blocksMap.contains(block)) {
           Iterator<DatanodeDescriptor> it =
-            fsn.blockManager.blocksMap.nodeIterator(block);
+            fsn.getBlockManager().blocksMap.nodeIterator(block);
 
           while (it.hasNext()) {
             doc.startTag("replica");

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java Tue Jul 26 01:53:10 2011
@@ -47,8 +47,7 @@ public class RenewDelegationTokenServlet
       throws ServletException, IOException {
     final UserGroupInformation ugi;
     final ServletContext context = getServletContext();
-    final Configuration conf = 
-      (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+    final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
     try {
       ugi = getUGI(req, conf);
     } catch(IOException ioe) {
@@ -58,7 +57,7 @@ public class RenewDelegationTokenServlet
           "Unable to identify or authenticate user");
       return;
     }
-    final NameNode nn = (NameNode) context.getAttribute("name.node");
+    final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
     String tokenString = req.getParameter(TOKEN);
     if (tokenString == null) {
       resp.sendError(HttpServletResponse.SC_MULTIPLE_CHOICES,

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Tue Jul 26 01:53:10 2011
@@ -175,6 +175,7 @@ public class SecondaryNameNode implement
       initialize(conf, commandLineOpts);
     } catch(IOException e) {
       shutdown();
+      LOG.fatal("Failed to start secondary namenode. ", e);
       throw e;
     }
   }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/packages/rpm/spec/hadoop-hdfs.spec
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/packages/rpm/spec/hadoop-hdfs.spec?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/packages/rpm/spec/hadoop-hdfs.spec (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/packages/rpm/spec/hadoop-hdfs.spec Tue Jul 26 01:53:10 2011
@@ -17,9 +17,10 @@
 # RPM Spec file for Hadoop version @version@
 #
 
-%define name         hadoop-hdfs
-%define version      @version@
-%define release      @package.release@
+%define name          hadoop-hdfs
+%define version       @version@
+%define release       @package.release@
+%define major_version %(echo %{version} | cut -d. -f -2)
 
 # Installation Locations
 %define _prefix      @package.prefix@
@@ -75,7 +76,7 @@ Prefix: %{_conf_dir}
 Prefix: %{_log_dir}
 Prefix: %{_pid_dir}
 Buildroot: %{_build_dir}
-Requires: sh-utils, textutils, /usr/sbin/useradd, /usr/sbin/usermod, /sbin/chkconfig, /sbin/service, jdk >= 1.6, hadoop-common >= %{version}
+Requires: sh-utils, textutils, /usr/sbin/useradd, /usr/sbin/usermod, /sbin/chkconfig, /sbin/service, hadoop-common >= %{major_version}.0, hadoop-common <= %{major_version}.9999
 AutoReqProv: no
 Provides: hadoop-hdfs
 

Propchange: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 01:53:10 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs/src/test/hdfs:1134994-1148523
+/hadoop/common/trunk/hdfs/src/test/hdfs:1134994-1150966
 /hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs:713112
 /hadoop/core/trunk/src/test/hdfs:776175-785643
 /hadoop/hdfs/branches/HDFS-1052/src/test/hdfs:987665-1095512

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Tue Jul 26 01:53:10 2011
@@ -59,13 +59,13 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
@@ -282,7 +282,8 @@ public class DFSTestUtil {
 
     do {
       Thread.sleep(1000);
-      int []r = NameNodeAdapter.getReplicaInfo(cluster.getNameNode(), b.getLocalBlock());
+      int[] r = BlockManagerTestUtil.getReplicaInfo(cluster.getNamesystem(),
+          b.getLocalBlock());
       curRacks = r[0];
       curReplicas = r[1];
       curNeededReplicas = r[2];

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java Tue Jul 26 01:53:10 2011
@@ -17,25 +17,30 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
 import java.net.URL;
 
-import org.apache.hadoop.hdfs.ByteRangeInputStream;
 import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener;
-
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 class MockHttpURLConnection extends HttpURLConnection {
-  MockURL m;
-  
-  public MockHttpURLConnection(URL u, MockURL m) {
-    super(u); 
-    this.m = m;
+  private int responseCode = -1;
+  URL m;
+
+  public MockHttpURLConnection(URL u) {
+    super(u);
+    m = u;
   }
   
   public boolean usingProxy(){
@@ -46,7 +51,6 @@ class MockHttpURLConnection extends Http
   }
   
   public void connect() throws IOException {
-    m.setMsg("Connect: "+url+", Range: "+getRequestProperty("Range"));
   }
   
   public InputStream getInputStream() throws IOException {
@@ -64,8 +68,8 @@ class MockHttpURLConnection extends Http
   }
   
   public int getResponseCode() {
-    if (m.responseCode != -1) {
-      return m.responseCode;
+    if (responseCode != -1) {
+      return responseCode;
     } else {
       if (getRequestProperty("Range") == null) {
         return 200;
@@ -74,89 +78,67 @@ class MockHttpURLConnection extends Http
       }
     }
   }
-  
-}
 
-class MockURL extends URLOpener {
-  String msg;
-  public int responseCode = -1;
-  
-  public MockURL(URL u) {
-    super(u);
+  public void setResponseCode(int resCode) {
+    responseCode = resCode;
   }
 
-  public MockURL(String s) throws MalformedURLException {
-    this(new URL(s));
-  }
-
-  public HttpURLConnection openConnection() throws IOException {
-    return new MockHttpURLConnection(url, this);
-  }    
-
-  public void setMsg(String s) {
-    msg = s;
-  }
-  
-  public String getMsg() {
-    return msg;
-  }
 }
 
 public class TestByteRangeInputStream {
   
   @Test
-  public void testByteRange() throws IOException, InterruptedException {
-    MockURL o = new MockURL("http://test/");
-    MockURL r =  new MockURL((URL)null);
-    ByteRangeInputStream is = new ByteRangeInputStream(o, r);
+  public void testByteRange() throws IOException {
+    URLOpener ospy = spy(new URLOpener(new URL("http://test/")));
+    doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
+        .openConnection();
+    URLOpener rspy = spy(new URLOpener((URL) null));
+    doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
+        .openConnection();
+    ByteRangeInputStream is = new ByteRangeInputStream(ospy, rspy);
 
     assertEquals("getPos wrong", 0, is.getPos());
 
     is.read();
 
-    assertEquals("Initial call made incorrectly", 
-                 "Connect: http://test/, Range: null",
-                 o.getMsg());
+    assertNull("Initial call made incorrectly (Range Check)", ospy
+        .openConnection().getRequestProperty("Range"));
 
     assertEquals("getPos should be 1 after reading one byte", 1, is.getPos());
 
-    o.setMsg(null);
-
     is.read();
 
     assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos());
 
-    assertNull("No additional connections should have been made (no seek)",
-               o.getMsg());
+    // No additional connections should have been made (no seek)
+
+    rspy.setURL(new URL("http://resolvedurl/"));
 
-    r.setMsg(null);
-    r.setURL(new URL("http://resolvedurl/"));
-    
     is.seek(100);
     is.read();
 
-    assertEquals("Seek to 100 bytes made incorrectly", 
-                 "Connect: http://resolvedurl/, Range: bytes=100-",
-                 r.getMsg());
+    assertEquals("Seek to 100 bytes made incorrectly (Range Check)",
+        "bytes=100-", rspy.openConnection().getRequestProperty("Range"));
 
-    assertEquals("getPos should be 101 after reading one byte", 101, is.getPos());
+    assertEquals("getPos should be 101 after reading one byte", 101,
+        is.getPos());
 
-    r.setMsg(null);
+    verify(rspy, times(2)).openConnection();
 
     is.seek(101);
     is.read();
 
-    assertNull("Seek to 101 should not result in another request", r.getMsg());
+    verify(rspy, times(2)).openConnection();
+
+    // Seek to 101 should not result in another request"
 
-    r.setMsg(null);
     is.seek(2500);
     is.read();
 
-    assertEquals("Seek to 2500 bytes made incorrectly", 
-                 "Connect: http://resolvedurl/, Range: bytes=2500-",
-                 r.getMsg());
+    assertEquals("Seek to 2500 bytes made incorrectly (Range Check)",
+        "bytes=2500-", rspy.openConnection().getRequestProperty("Range"));
 
-    r.responseCode = 200;
+    ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(200);
     is.seek(500);
     
     try {
@@ -168,7 +150,7 @@ public class TestByteRangeInputStream {
                    "HTTP_PARTIAL expected, received 200", e.getMessage());
     }
 
-    r.responseCode = 206;
+    ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206);
     is.seek(0);
 
     try {

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java Tue Jul 26 01:53:10 2011
@@ -51,6 +51,8 @@ public class TestDecommission {
   static final int blockSize = 8192;
   static final int fileSize = 16384;
   static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
+  static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec
+  static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval
 
   Random myrand = new Random();
   Path hostsFile;
@@ -74,7 +76,10 @@ public class TestDecommission {
     conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL);
+  
     writeConfigFile(excludeFile, null);
   }
   
@@ -118,49 +123,67 @@ public class TestDecommission {
     stm.close();
     LOG.info("Created file " + name + " with " + repl + " replicas.");
   }
-  
+
   /**
-   * For blocks that reside on the nodes that are down, verify that their
-   * replication factor is 1 more than the specified one.
+   * Verify that the number of replicas are as expected for each block in
+   * the given file.
+   * For blocks with a decommissioned node, verify that their replication
+   * is 1 more than what is specified.
+   * For blocks without decommissioned nodes, verify their replication is
+   * equal to what is specified.
+   * 
+   * @param downnode - if null, there is no decommissioned node for this file.
+   * @return - null if no failure found, else an error message string.
    */
-  private void checkFile(FileSystem fileSys, Path name, int repl,
-                         String downnode, int numDatanodes) throws IOException {
-    //
-    // sleep an additional 10 seconds for the blockreports from the datanodes
-    // to arrive. 
-    //
+  private String checkFile(FileSystem fileSys, Path name, int repl,
+    String downnode, int numDatanodes) throws IOException {
+    boolean isNodeDown = (downnode != null);
     // need a raw stream
-    assertTrue("Not HDFS:"+fileSys.getUri(), fileSys instanceof DistributedFileSystem);
-        
-    DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream) 
+    assertTrue("Not HDFS:"+fileSys.getUri(), 
+    fileSys instanceof DistributedFileSystem);
+    DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream)
       ((DistributedFileSystem)fileSys).open(name);
     Collection<LocatedBlock> dinfo = dis.getAllBlocks();
-
     for (LocatedBlock blk : dinfo) { // for each block
       int hasdown = 0;
-      int firstDecomNodeIndex = -1;
       DatanodeInfo[] nodes = blk.getLocations();
-      for (int j = 0; j < nodes.length; j++) {     // for each replica
-        if (nodes[j].getName().equals(downnode)) {
+      for (int j = 0; j < nodes.length; j++) { // for each replica
+        if (isNodeDown && nodes[j].getName().equals(downnode)) {
           hasdown++;
-          LOG.info("Block " + blk.getBlock() + " replica " + nodes[j].getName()
-              + " is decommissioned.");
-        }
-        if (nodes[j].isDecommissioned()) {
-          if (firstDecomNodeIndex == -1) {
-            firstDecomNodeIndex = j;
+          //Downnode must actually be decommissioned
+          if (!nodes[j].isDecommissioned()) {
+            return "For block " + blk.getBlock() + " replica on " +
+              nodes[j].getName() + " is given as downnode, " +
+              "but is not decommissioned";
+          }
+          //Decommissioned node (if any) should only be last node in list.
+          if (j != nodes.length - 1) {
+            return "For block " + blk.getBlock() + " decommissioned node "
+              + nodes[j].getName() + " was not last node in list: "
+              + (j + 1) + " of " + nodes.length;
+          }
+          LOG.info("Block " + blk.getBlock() + " replica on " +
+            nodes[j].getName() + " is decommissioned.");
+        } else {
+          //Non-downnodes must not be decommissioned
+          if (nodes[j].isDecommissioned()) {
+            return "For block " + blk.getBlock() + " replica on " +
+              nodes[j].getName() + " is unexpectedly decommissioned";
           }
-          continue;
         }
-        assertEquals("Decom node is not at the end", firstDecomNodeIndex, -1);
       }
+
       LOG.info("Block " + blk.getBlock() + " has " + hasdown
-          + " decommissioned replica.");
-      assertEquals("Number of replicas for block " + blk.getBlock(),
-                   Math.min(numDatanodes, repl+hasdown), nodes.length);  
+        + " decommissioned replica.");
+      if(Math.min(numDatanodes, repl+hasdown) != nodes.length) {
+        return "Wrong number of replicas for block " + blk.getBlock() +
+          ": " + nodes.length + ", expected " +
+          Math.min(numDatanodes, repl+hasdown);
+      }
     }
+    return null;
   }
-  
+
   private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
     assertTrue(fileSys.exists(name));
     fileSys.delete(name, true);
@@ -208,6 +231,15 @@ public class TestDecommission {
     return ret;
   }
 
+  /* stop decommission of the datanode and wait for each to reach the NORMAL state */
+  private void recomissionNode(DatanodeInfo decommissionedNode) throws IOException {
+    LOG.info("Recommissioning node: " + decommissionedNode.getName());
+    writeConfigFile(excludeFile, null);
+    cluster.getNamesystem().refreshNodes(conf);
+    waitNodeState(decommissionedNode, AdminStates.NORMAL);
+
+  }
+
   /* 
    * Wait till node is fully decommissioned.
    */
@@ -287,6 +319,14 @@ public class TestDecommission {
   }
   
   /**
+   * Tests recommission for non federated cluster
+   */
+  @Test
+  public void testRecommission() throws IOException {
+    testRecommission(1, 6);
+  }
+
+  /**
    * Test decommission for federeated cluster
    */
   @Test
@@ -323,15 +363,68 @@ public class TestDecommission {
         DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
         assertEquals("All datanodes must be alive", numDatanodes, 
             client.datanodeReport(DatanodeReportType.LIVE).length);
-        checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes);
+        assertNull(checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes));
         cleanupFile(fileSys, file1);
       }
     }
-    
-    // Restart the cluster and ensure decommissioned datanodes
+
+    // Restart the cluster and ensure recommissioned datanodes
     // are allowed to register with the namenode
     cluster.shutdown();
     startCluster(numNamenodes, numDatanodes, conf);
+    cluster.shutdown();
+  }
+
+
+  private void testRecommission(int numNamenodes, int numDatanodes) 
+    throws IOException {
+    LOG.info("Starting test testRecommission");
+
+    startCluster(numNamenodes, numDatanodes, conf);
+  
+    ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = 
+      new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes);
+    for(int i = 0; i < numNamenodes; i++) {
+      namenodeDecomList.add(i, new ArrayList<DatanodeInfo>(numDatanodes));
+    }
+    Path file1 = new Path("testDecommission.dat");
+    int replicas = numDatanodes - 1;
+      
+    for (int i = 0; i < numNamenodes; i++) {
+      ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i);
+      FileSystem fileSys = cluster.getFileSystem(i);
+      writeFile(fileSys, file1, replicas);
+        
+      // Decommission one node. Verify that node is decommissioned.
+      DatanodeInfo decomNode = decommissionNode(i, decommissionedNodes,
+          AdminStates.DECOMMISSIONED);
+      decommissionedNodes.add(decomNode);
+        
+      // Ensure decommissioned datanode is not automatically shutdown
+      DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
+      assertEquals("All datanodes must be alive", numDatanodes, 
+          client.datanodeReport(DatanodeReportType.LIVE).length);
+      assertNull(checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes));
+
+      // stop decommission and check if the new replicas are removed
+      recomissionNode(decomNode);
+      // wait for the block to be deleted
+      int tries = 0;
+      while (tries++ < 20) {
+        try {
+          Thread.sleep(1000);
+          if (checkFile(fileSys, file1, replicas, null, numDatanodes) == null) {
+            break;
+          }
+        } catch (InterruptedException ie) {
+        }
+      }
+      cleanupFile(fileSys, file1);
+      assertTrue("Checked if node was recommissioned " + tries + " times.",
+         tries < 20);
+      LOG.info("tried: " + tries + " times before recommissioned");
+    }
+    cluster.shutdown();
   }
   
   /**

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Tue Jul 26 01:53:10 2011
@@ -68,7 +68,7 @@ public class TestReplicationPolicy exten
       throw (RuntimeException)new RuntimeException().initCause(e);
     }
     final BlockManager bm = namenode.getNamesystem().getBlockManager();
-    replicator = bm.replicator;
+    replicator = bm.getBlockPlacementPolicy();
     cluster = bm.getDatanodeManager().getNetworkTopology();
     // construct network topology
     for(int i=0; i<NUM_OF_DATANODES; i++) {

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java Tue Jul 26 01:53:10 2011
@@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -93,7 +93,7 @@ public class TestJspHelper {
     //Set the nnaddr url parameter to null.
     when(request.getParameter(JspHelper.NAMENODE_ADDRESS)).thenReturn(null);
     InetSocketAddress addr = new InetSocketAddress("localhost", 2222);
-    when(context.getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY))
+    when(context.getAttribute(NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY))
         .thenReturn(addr);
     verifyServiceInToken(context, request, addr.getAddress().getHostAddress()
         + ":2222");
@@ -102,7 +102,7 @@ public class TestJspHelper {
     token.setService(new Text("3.3.3.3:3333"));
     tokenString = token.encodeToUrlString();
     //Set the name.node.address attribute in Servlet context to null
-    when(context.getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY))
+    when(context.getAttribute(NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY))
         .thenReturn(null);
     when(request.getParameter(JspHelper.DELEGATION_PARAMETER_NAME)).thenReturn(
         tokenString);

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Tue Jul 26 01:53:10 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -220,7 +221,8 @@ public class TestBlockReport {
     cluster.getNameNode().blockReport(dnR, poolId,
       new BlockListAsLongs(blocks, null).getBlockListAsLongs());
 
-    cluster.getNamesystem().computeDatanodeWork();
+    BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
+        .getBlockManager());
 
     printStats();
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Tue Jul 26 01:53:10 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -191,7 +192,7 @@ public class TestDataNodeVolumeFailure {
     // now check the number of under-replicated blocks
     FSNamesystem fsn = cluster.getNamesystem();
     // force update of all the metric counts by calling computeDatanodeWork
-    fsn.computeDatanodeWork();
+    BlockManagerTestUtil.getComputedDatanodeWork(fsn.getBlockManager());
     // get all the counts 
     long underRepl = fsn.getUnderReplicatedBlocks();
     long pendRepl = fsn.getPendingReplicationBlocks();

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java Tue Jul 26 01:53:10 2011
@@ -24,6 +24,7 @@ import java.util.List;
 import junit.framework.Assert;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -63,5 +64,33 @@ public class TestRoundRobinVolumesPolicy
       // Passed.
     }
   }
+  
+  // ChooseVolume should throw DiskOutOfSpaceException with volume and block sizes in exception message.
+  @Test
+  public void testRRPolicyExceptionMessage()
+      throws Exception {
+    final List<FSVolume> volumes = new ArrayList<FSVolume>();
+
+    // First volume, with 500 bytes of space.
+    volumes.add(Mockito.mock(FSVolume.class));
+    Mockito.when(volumes.get(0).getAvailable()).thenReturn(500L);
+
+    // Second volume, with 600 bytes of space.
+    volumes.add(Mockito.mock(FSVolume.class));
+    Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L);
+
+    RoundRobinVolumesPolicy policy = new RoundRobinVolumesPolicy();
+    int blockSize = 700;
+    try {
+      policy.chooseVolume(volumes, blockSize);
+      Assert.fail("expected to throw DiskOutOfSpaceException");
+    } catch (DiskOutOfSpaceException e) {
+      Assert
+          .assertEquals(
+              "Not returnig the expected message",
+              "Insufficient space for an additional block. Volume with the most available space has 600 bytes free, configured block size is " + blockSize, e
+                  .getMessage());
+    }
+  }
 
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1150969&r1=1150968&r2=1150969&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Tue Jul 26 01:53:10 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
@@ -1111,9 +1112,11 @@ public class NNThroughputBenchmark {
       // start data-nodes; create a bunch of files; generate block reports.
       blockReportObject.generateInputs(ignore);
       // stop replication monitor
-      namesystem.replthread.interrupt();
+      BlockManagerTestUtil.getReplicationThread(namesystem.getBlockManager())
+          .interrupt();
       try {
-        namesystem.replthread.join();
+        BlockManagerTestUtil.getReplicationThread(namesystem.getBlockManager())
+            .join();
       } catch(InterruptedException ei) {
         return;
       }
@@ -1156,7 +1159,8 @@ public class NNThroughputBenchmark {
       assert daemonId < numThreads : "Wrong daemonId.";
       long start = System.currentTimeMillis();
       // compute data-node work
-      int work = nameNode.getNamesystem().computeDatanodeWork();
+      int work = BlockManagerTestUtil.getComputedDatanodeWork(nameNode
+          .getNamesystem().getBlockManager());
       long end = System.currentTimeMillis();
       numPendingBlocks += work;
       if(work == 0)