You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:06:08 UTC

svn commit: r885143 [11/18] - in /hadoop/hdfs/branches/HDFS-326: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/ src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs...

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Sat Nov 28 20:05:56 2009
@@ -22,69 +22,145 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileUtil.HardLink;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.io.IOUtils;
 
 /**
- * This class is used by the datanode to maintain the map from a block 
- * to its metadata.
+ * This class is used by datanodes to maintain meta data of its replicas.
+ * It provides a general interface for meta information of a replica.
  */
-class ReplicaInfo {
+abstract public class ReplicaInfo extends Block implements Replica {
+  private FSVolume volume;      // volume where the replica belongs
+  private File     dir;         // directory where block & meta files belong
 
-  private FSVolume volume;       // volume where the block belongs
-  private File     file;         // block file
-  private boolean detached;      // copy-on-write done for block
-
-  ReplicaInfo(FSVolume vol, File file) {
+  /**
+   * Constructor for a zero length replica
+   * @param blockId block id
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaInfo(long blockId, long genStamp, FSVolume vol, File dir) {
+    this( blockId, 0L, genStamp, vol, dir);
+  }
+  
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaInfo(Block block, FSVolume vol, File dir) {
+    this(block.getBlockId(), block.getNumBytes(), 
+        block.getGenerationStamp(), vol, dir);
+  }
+  
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaInfo(long blockId, long len, long genStamp,
+      FSVolume vol, File dir) {
+    super(blockId, len, genStamp);
     this.volume = vol;
-    this.file = file;
-    detached = false;
+    this.dir = dir;
   }
 
-  ReplicaInfo(FSVolume vol) {
-    this.volume = vol;
-    this.file = null;
-    detached = false;
+  /**
+   * Copy constructor.
+   * @param from
+   */
+  ReplicaInfo(ReplicaInfo from) {
+    this(from, from.getVolume(), from.getDir());
   }
 
+  /**
+   * Get this replica's meta file name
+   * @return this replica's meta file name
+   */
+  private String getMetaFileName() {
+    return getBlockName() + "_" + getGenerationStamp() + METADATA_EXTENSION; 
+  }
+  
+  /**
+   * Get the full path of this replica's data file
+   * @return the full path of this replica's data file
+   */
+  File getBlockFile() {
+    return new File(getDir(), getBlockName());
+  }
+  
+  /**
+   * Get the full path of this replica's meta file
+   * @return the full path of this replica's meta file
+   */
+  File getMetaFile() {
+    return new File(getDir(), getMetaFileName());
+  }
+  
+  /**
+   * Get the volume where this replica is located on disk
+   * @return the volume where this replica is located on disk
+   */
   FSVolume getVolume() {
     return volume;
   }
-
-  File getFile() {
-    return file;
+  
+  /**
+   * Set the volume where this replica is located on disk
+   */
+  void setVolume(FSVolume vol) {
+    this.volume = vol;
   }
-
-  void setFile(File f) {
-    file = f;
+  
+  /**
+   * Return the parent directory path where this replica is located
+   * @return the parent directory path where this replica is located
+   */
+  File getDir() {
+    return dir;
   }
 
   /**
-   * Is this block already detached?
+   * Set the parent directory where this replica is located
+   * @param dir the parent directory where the replica is located
    */
-  boolean isDetached() {
-    return detached;
+  void setDir(File dir) {
+    this.dir = dir;
   }
 
   /**
-   *  Block has been successfully detached
+   * check if this replica has already been unlinked.
+   * @return true if the replica has already been unlinked 
+   *         or no need to be detached; false otherwise
    */
-  void setDetached() {
-    detached = true;
+  boolean isUnlinked() {
+    return true;                // no need to be unlinked
   }
 
   /**
+   * set that this replica is unlinked
+   */
+  void setUnlinked() {
+    // no need to be unlinked
+  }
+  
+   /**
    * Copy specified file into a temporary file. Then rename the
    * temporary file to the original name. This will cause any
    * hardlinks to the original file to be removed. The temporary
-   * files are created in the detachDir. The temporary files will
+   * files are created in the same directory. The temporary files will
    * be recovered (especially on Windows) on datanode restart.
    */
-  private void detachFile(File file, Block b) throws IOException {
-    File tmpFile = volume.createDetachFile(b, file.getName());
+  private void unlinkFile(File file, Block b) throws IOException {
+    File tmpFile = FSDataset.createTmpFile(b, FSDataset.getUnlinkTmpFile(file));
     try {
       FileInputStream in = new FileInputStream(file);
       try {
@@ -114,33 +190,60 @@
   }
 
   /**
-   * Returns true if this block was copied, otherwise returns false.
+   * Remove a hard link by copying the block to a temporary place and 
+   * then moving it back
+   * @param numLinks number of hard links
+   * @return true if copy is successful; 
+   *         false if it is already detached or no need to be detached
+   * @throws IOException if there is any copy error
    */
-  boolean detachBlock(Block block, int numLinks) throws IOException {
-    if (isDetached()) {
+  boolean unlinkBlock(int numLinks) throws IOException {
+    if (isUnlinked()) {
       return false;
     }
-    if (file == null || volume == null) {
-      throw new IOException("detachBlock:Block not found. " + block);
+    File file = getBlockFile();
+    if (file == null || getVolume() == null) {
+      throw new IOException("detachBlock:Block not found. " + this);
     }
-    File meta = FSDataset.getMetaFile(file, block);
+    File meta = getMetaFile();
     if (meta == null) {
-      throw new IOException("Meta file not found for block " + block);
+      throw new IOException("Meta file not found for block " + this);
     }
 
     if (HardLink.getLinkCount(file) > numLinks) {
-      DataNode.LOG.info("CopyOnWrite for block " + block);
-      detachFile(file, block);
+      DataNode.LOG.info("CopyOnWrite for block " + this);
+      unlinkFile(file, this);
     }
     if (HardLink.getLinkCount(meta) > numLinks) {
-      detachFile(meta, block);
+      unlinkFile(meta, this);
     }
-    setDetached();
+    setUnlinked();
     return true;
   }
+
+  /**
+   * Set this replica's generation stamp to be a newer one
+   * @param newGS new generation stamp
+   * @throws IOException is the new generation stamp is not greater than the current one
+   */
+  void setNewerGenerationStamp(long newGS) throws IOException {
+    long curGS = getGenerationStamp();
+    if (newGS <= curGS) {
+      throw new IOException("New generation stamp (" + newGS 
+          + ") must be greater than current one (" + curGS + ")");
+    }
+    setGenerationStamp(newGS);
+  }
   
+  @Override  //Object
   public String toString() {
-    return getClass().getSimpleName() + "(volume=" + volume
-        + ", file=" + file + ", detached=" + detached + ")";
+    return getClass().getSimpleName()
+        + ", " + super.toString()
+        + ", " + getState()
+        + "\n  getNumBytes()     = " + getNumBytes()
+        + "\n  getBytesOnDisk()  = " + getBytesOnDisk()
+        + "\n  getVisibleLength()= " + getVisibleLength()
+        + "\n  getVolume()       = " + getVolume()
+        + "\n  getBlockFile()    = " + getBlockFile();
   }
 }

Propchange: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:05:56 2009
@@ -1,2 +1,6 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java:713112
+/hadoop/core/branches/branch-0.19/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:713112
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java:776175-785643,785929-786278
+/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:776175-785643,785929-786278
+/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
+/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:820487

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeActivityMBean.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeActivityMBean.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeActivityMBean.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeActivityMBean.java Sat Nov 28 20:05:56 2009
@@ -28,7 +28,7 @@
  * 
  * This is the JMX MBean for reporting the DataNode Activity.
  * The MBean is register using the name
- *        "hadoop:service=DataNode,name=DataNodeActivity-<storageid>"
+ *        "hadoop:service=DataNode,name=DataNodeActivity-<hostname>-<portNumber>"
  * 
  * Many of the activity metrics are sampled and averaged on an interval 
  * which can be specified in the metrics config file.
@@ -57,15 +57,17 @@
   final private ObjectName mbeanName;
   private Random rand = new Random(); 
 
-  public DataNodeActivityMBean(final MetricsRegistry mr, final String storageId) {
+  public DataNodeActivityMBean(final MetricsRegistry mr,
+      final String datanodeName) {
     super(mr, "Activity statistics at the DataNode");
-    String storageName;
-    if (storageId.equals("")) {// Temp fix for the uninitialized storage
-      storageName = "UndefinedStorageId" + rand.nextInt();
+    String name;
+    if (datanodeName.equals("")) {// Temp fix for the uninitialized name
+      name = "UndefinedDataNodeName" + rand.nextInt();
     } else {
-      storageName = storageId;
+      name = datanodeName.replace(":", "-");
     }
-    mbeanName = MBeanUtil.registerMBean("DataNode", "DataNodeActivity-" + storageName, this);
+    mbeanName = MBeanUtil.registerMBean("DataNode",
+	"DataNodeActivity-" + name, this);
   }
   
 

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Sat Nov 28 20:05:56 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 
 /**
@@ -90,14 +91,14 @@
                     new MetricsTimeVaryingRate("blockReports", registry);
 
     
-  public DataNodeMetrics(Configuration conf, String storageId) {
-    String sessionId = conf.get("session.id"); 
+  public DataNodeMetrics(Configuration conf, String datanodeName) {
+    String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY); 
     // Initiate reporting of Java VM metrics
     JvmMetrics.init("DataNode", sessionId);
     
 
     // Now the MBean for the data node
-    datanodeActivityMBean = new DataNodeActivityMBean(registry, storageId);
+    datanodeActivityMBean = new DataNodeActivityMBean(registry, datanodeName);
     
     // Create record for DataNode metrics
     MetricsContext context = MetricsUtil.getContext("dfs");

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Sat Nov 28 20:05:56 2009
@@ -32,6 +32,7 @@
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.DNS;
@@ -52,10 +53,10 @@
  * </ol>
  */
 public class BackupNode extends NameNode {
-  private static final String BN_ADDRESS_NAME_KEY = "dfs.backup.address";
-  private static final String BN_ADDRESS_DEFAULT = "localhost:50100";
-  private static final String BN_HTTP_ADDRESS_NAME_KEY = "dfs.backup.http.address";
-  private static final String BN_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50105";
+  private static final String BN_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
+  private static final String BN_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT;
+  private static final String BN_HTTP_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
+  private static final String BN_HTTP_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
 
   /** Name-node proxy */
   NamenodeProtocol namenode;

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java Sat Nov 28 20:05:56 2009
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 
 /**
  * Internal class for block metadata.
@@ -35,12 +36,22 @@
    */
   private Object[] triplets;
 
-  BlockInfo(Block blk, int replication) {
+  protected BlockInfo(Block blk, int replication) {
     super(blk);
     this.triplets = new Object[3*replication];
     this.inode = null;
   }
 
+  /**
+   * Copy construction.
+   * This is used to convert BlockInfoUnderConstruction
+   * @param from BlockInfo to copy from.
+   */
+  protected BlockInfo(BlockInfo from) {
+    this(from, from.inode.getReplication());
+    this.inode = from.inode;
+  }
+
   INodeFile getINode() {
     return inode;
   }
@@ -64,7 +75,7 @@
     assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
     BlockInfo info = (BlockInfo)triplets[index*3+1];
     assert info == null || 
-        BlockInfo.class.getName().equals(info.getClass().getName()) : 
+        info.getClass().getName().startsWith(BlockInfo.class.getName()) : 
               "BlockInfo is expected at " + index*3;
     return info;
   }
@@ -74,7 +85,7 @@
     assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
     BlockInfo info = (BlockInfo)triplets[index*3+2];
     assert info == null || 
-        BlockInfo.class.getName().equals(info.getClass().getName()) : 
+        info.getClass().getName().startsWith(BlockInfo.class.getName()) : 
               "BlockInfo is expected at " + index*3;
     return info;
   }
@@ -262,6 +273,43 @@
     return true;
   }
 
+  /**
+   * BlockInfo represents a block that is not being constructed.
+   * In order to start modifying the block, the BlockInfo should be converted
+   * to {@link BlockInfoUnderConstruction}.
+   * @return {@link BlockUCState#COMPLETE}
+   */
+  BlockUCState getBlockUCState() {
+    return BlockUCState.COMPLETE;
+  }
+
+  /**
+   * Is this block complete?
+   * 
+   * @return true if the state of the block is {@link BlockUCState#COMPLETE}
+   */
+  boolean isComplete() {
+    return getBlockUCState().equals(BlockUCState.COMPLETE);
+  }
+
+  /**
+   * Convert a complete block to an under construction block.
+   * 
+   * @return BlockInfoUnderConstruction -  an under construction block.
+   */
+  BlockInfoUnderConstruction convertToBlockUnderConstruction(
+      BlockUCState s, DatanodeDescriptor[] targets) {
+    if(isComplete()) {
+      return new BlockInfoUnderConstruction(
+          this, getINode().getReplication(), s, targets);
+    }
+    // the block is already under construction
+    BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this;
+    ucBlock.setBlockUCState(s);
+    ucBlock.setExpectedLocations(targets);
+    return ucBlock;
+  }
+
   @Override
   public int hashCode() {
     // Super implementation is sufficient

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Sat Nov 28 20:05:56 2009
@@ -22,7 +22,6 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -37,9 +36,12 @@
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
-import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.mortbay.log.Log;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
@@ -105,6 +107,9 @@
   // Default number of replicas
   int defaultReplication;
 
+  // variable to enable check for enough racks 
+  boolean shouldCheckForEnoughRacks = true;
+
   /**
    * Last block index used for replication work.
    */
@@ -114,7 +119,7 @@
   Random r = new Random();
 
   // for block replicas placement
-  ReplicationTargetChooser replicator;
+  BlockPlacementPolicy replicator;
 
   BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
     this(fsn, conf, DEFAULT_INITIAL_MAP_CAPACITY);
@@ -124,24 +129,25 @@
       throws IOException {
     namesystem = fsn;
     pendingReplications = new PendingReplicationBlocks(
-        conf.getInt("dfs.replication.pending.timeout.sec",
-                    -1) * 1000L);
+        conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
+            DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
     setConfigurationParameters(conf);
     blocksMap = new BlocksMap(capacity, DEFAULT_MAP_LOAD_FACTOR);
   }
 
   void setConfigurationParameters(Configuration conf) throws IOException {
-    this.replicator = new ReplicationTargetChooser(
-                         conf.getBoolean("dfs.replication.considerLoad", true),
+    this.replicator = BlockPlacementPolicy.getInstance(
+                         conf,
                          namesystem,
                          namesystem.clusterMap);
 
     this.defaultReplication = conf.getInt("dfs.replication", 3);
     this.maxReplication = conf.getInt("dfs.replication.max", 512);
-    this.minReplication = conf.getInt("dfs.replication.min", 1);
+    this.minReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
+                                      DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
     if (minReplication <= 0)
       throw new IOException(
-                            "Unexpected configuration parameters: dfs.replication.min = "
+                            "Unexpected configuration parameters: dfs.namenode.replication.min = "
                             + minReplication
                             + " must be greater than 0");
     if (maxReplication >= (int)Short.MAX_VALUE)
@@ -150,15 +156,19 @@
                             + maxReplication + " must be less than " + (Short.MAX_VALUE));
     if (maxReplication < minReplication)
       throw new IOException(
-                            "Unexpected configuration parameters: dfs.replication.min = "
+                            "Unexpected configuration parameters: dfs.namenode.replication.min = "
                             + minReplication
                             + " must be less than dfs.replication.max = "
                             + maxReplication);
-    this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
+    this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
+                                             DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
+    this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false
+                                                                             : true;
     FSNamesystem.LOG.info("defaultReplication = " + defaultReplication);
     FSNamesystem.LOG.info("maxReplication = " + maxReplication);
     FSNamesystem.LOG.info("minReplication = " + minReplication);
     FSNamesystem.LOG.info("maxReplicationStreams = " + maxReplicationStreams);
+    FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
   }
 
   void activate() {
@@ -184,6 +194,11 @@
         chooseSourceDatanode(block, containingNodes, numReplicas);
         int usableReplicas = numReplicas.liveReplicas() +
                              numReplicas.decommissionedReplicas();
+       
+        if (block instanceof BlockInfo) {
+          String fileName = ((BlockInfo)block).getINode().getFullPathName();
+          out.print(fileName + ": ");
+        }
         // l: == live:, d: == decommissioned c: == corrupt e: == excess
         out.print(block + ((usableReplicas > 0)? "" : " MISSING") + 
                   " (replicas:" +
@@ -227,7 +242,81 @@
    * @return true if the block has minimum replicas
    */
   boolean checkMinReplication(Block block) {
-    return (blocksMap.numNodes(block) >= minReplication);
+    return (countNodes(block).liveReplicas() >= minReplication);
+  }
+
+  /**
+   * Commit the last block of the file.
+   * 
+   * @param fileINode file inode
+   * @param commitBlock - contains client reported block length and generation
+   * @throws IOException if the block does not have at least a minimal number
+   * of replicas reported from data-nodes.
+   */
+  void commitLastBlock(INodeFileUnderConstruction fileINode, 
+                       Block commitBlock) throws IOException {
+    if(commitBlock == null)
+      return; // not committing, this is a block allocation retry
+    BlockInfo lastBlock = fileINode.getLastBlock();
+    if(lastBlock == null)
+      return; // no blocks in file yet
+    if(lastBlock.isComplete())
+      return; // already completed (e.g. by syncBlock)
+    assert lastBlock.getNumBytes() <= commitBlock.getNumBytes() :
+      "commitBlock length is less than the stored one "
+      + commitBlock.getNumBytes() + " vs. " + lastBlock.getNumBytes();
+    ((BlockInfoUnderConstruction)lastBlock).commitBlock(commitBlock);
+  }
+
+  /**
+   * Convert a specified block of the file to a complete block.
+   * @param fileINode file
+   * @param blkIndex  block index in the file
+   * @throws IOException if the block does not have at least a minimal number
+   * of replicas reported from data-nodes.
+   */
+  BlockInfo completeBlock(INodeFile fileINode, int blkIndex)
+  throws IOException {
+    if(blkIndex < 0)
+      return null;
+    BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
+    if(curBlock.isComplete())
+      return curBlock;
+    BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
+    if(ucBlock.numNodes() < minReplication)
+      throw new IOException("Cannot complete block: " +
+          "block does not satisfy minimal replication requirement.");
+    BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
+    // replace penultimate block in file
+    fileINode.setBlock(blkIndex, completeBlock);
+    // replace block in the blocksMap
+    return blocksMap.replaceBlock(completeBlock);
+  }
+
+  BlockInfo completeBlock(INodeFile fileINode, BlockInfo block)
+  throws IOException {
+    BlockInfo[] fileBlocks = fileINode.getBlocks();
+    for(int idx = 0; idx < fileBlocks.length; idx++)
+      if(fileBlocks[idx] == block) {
+        return completeBlock(fileINode, idx);
+      }
+    return block;
+  }
+
+  /**
+   * Convert the last block of the file to an under construction block.
+   * @param fileINode file
+   * @param targets data-nodes that will form the pipeline for this block
+   */
+  void convertLastBlockToUnderConstruction(
+      INodeFileUnderConstruction fileINode,
+      DatanodeDescriptor[] targets) throws IOException {
+    BlockInfo oldBlock = fileINode.getLastBlock();
+    if(oldBlock == null)
+      return;
+    BlockInfoUnderConstruction ucBlock =
+      fileINode.setLastBlock(oldBlock, targets);
+    blocksMap.replaceBlock(ucBlock);
   }
 
   /**
@@ -248,7 +337,7 @@
     return machineSet;
   }
 
-  List<LocatedBlock> getBlockLocations(Block[] blocks, long offset,
+  List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
       long length, int nrBlocksToReturn) throws IOException {
     int curBlk = 0;
     long curPos = 0, blkSize = 0;
@@ -263,43 +352,12 @@
     }
 
     if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
-      return null;
+      return Collections.<LocatedBlock>emptyList();
 
     long endOff = offset + length;
     List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
     do {
-      // get block locations
-      int numNodes = blocksMap.numNodes(blocks[curBlk]);
-      int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
-      int numCorruptReplicas = corruptReplicas
-          .numCorruptReplicas(blocks[curBlk]);
-      if (numCorruptNodes != numCorruptReplicas) {
-        FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for "
-            + blocks[curBlk] + "blockMap has " + numCorruptNodes
-            + " but corrupt replicas map has " + numCorruptReplicas);
-      }
-      boolean blockCorrupt = (numCorruptNodes == numNodes);
-      int numMachineSet = blockCorrupt ? numNodes :
-                          (numNodes - numCorruptNodes);
-      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
-      if (numMachineSet > 0) {
-        numNodes = 0;
-        for (Iterator<DatanodeDescriptor> it = 
-             blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
-          DatanodeDescriptor dn = it.next();
-          boolean replicaCorrupt = 
-            corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
-          if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
-            machineSet[numNodes++] = dn;
-        }
-      }
-      LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
-          blockCorrupt);
-      if (namesystem.isAccessTokenEnabled) {
-        b.setAccessToken(namesystem.accessTokenHandler.generateToken(b.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.READ)));
-      }
-      results.add(b);
+      results.add(getBlockLocation(blocks[curBlk], curPos));
       curPos += blocks[curBlk].getNumBytes();
       curBlk++;
     } while (curPos < endOff 
@@ -308,6 +366,41 @@
     return results;
   }
 
+  /** @return a LocatedBlock for the given block */
+  LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
+      ) throws IOException {
+    if (!blk.isComplete()) {
+      final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
+      final DatanodeDescriptor[] locations = uc.getExpectedLocations();
+      return namesystem.createLocatedBlock(uc, locations, pos, false);
+    }
+
+    // get block locations
+    final int numCorruptNodes = countNodes(blk).corruptReplicas();
+    final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
+    if (numCorruptNodes != numCorruptReplicas) {
+      FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for "
+          + blk + " blockMap has " + numCorruptNodes
+          + " but corrupt replicas map has " + numCorruptReplicas);
+    }
+
+    final int numNodes = blocksMap.numNodes(blk);
+    final boolean isCorrupt = numCorruptNodes == numNodes;
+    final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
+    final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
+    if (numMachines > 0) {
+      int j = 0;
+      for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
+          it.hasNext();) {
+        final DatanodeDescriptor d = it.next();
+        final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
+        if (isCorrupt || (!isCorrupt && !replicaCorrupt))
+          machines[j++] = d;
+      }
+    }
+    return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt);    
+  }
+
   /**
    * Check whether the replication parameter is within the range
    * determined by system configuration.
@@ -346,12 +439,13 @@
 
   /**
    * Adds block to list of blocks which will be invalidated on specified
-   * datanode and log the move
+   * datanode
    *
    * @param b block
    * @param dn datanode
+   * @param log true to create an entry in the log 
    */
-  void addToInvalidates(Block b, DatanodeInfo dn) {
+  void addToInvalidates(Block b, DatanodeInfo dn, boolean log) {
     Collection<Block> invalidateSet = recentInvalidateSets
         .get(dn.getStorageID());
     if (invalidateSet == null) {
@@ -360,20 +454,39 @@
     }
     if (invalidateSet.add(b)) {
       pendingDeletionBlocksCount++;
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
-          + b.getBlockName() + " is added to invalidSet of " + dn.getName());
+      if (log) {
+        NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
+            + b + " to " + dn.getName());
+      }
     }
   }
 
   /**
+   * Adds block to list of blocks which will be invalidated on specified
+   * datanode and log the operation
+   *
+   * @param b block
+   * @param dn datanode
+   */
+  void addToInvalidates(Block b, DatanodeInfo dn) {
+    addToInvalidates(b, dn, true);
+  }
+
+  /**
    * Adds block to list of blocks which will be invalidated on all its
    * datanodes.
    */
   private void addToInvalidates(Block b) {
+    StringBuilder datanodes = new StringBuilder();
     for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); it
         .hasNext();) {
       DatanodeDescriptor node = it.next();
-      addToInvalidates(b, node);
+      addToInvalidates(b, node, false);
+      datanodes.append(node.getName()).append(" ");
+    }
+    if (datanodes.length() != 0) {
+      NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
+          + b + " to " + datanodes.toString());
     }
   }
 
@@ -431,6 +544,10 @@
       addToInvalidates(storedBlock, node);
       return;
     } 
+
+    // Add replica to the data-node if it is not already there
+    node.addBlock(storedBlock);
+
     // Add this replica to corruptReplicas Map
     corruptReplicas.addToCorruptReplicasMap(storedBlock, node);
     if (countNodes(storedBlock).liveReplicas() > inode.getReplication()) {
@@ -613,17 +730,20 @@
     int requiredReplication, numEffectiveReplicas;
     List<DatanodeDescriptor> containingNodes;
     DatanodeDescriptor srcNode;
+    INodeFile fileINode = null;
+    int additionalReplRequired;
 
     synchronized (namesystem) {
       synchronized (neededReplications) {
         // block should belong to a file
-        INodeFile fileINode = blocksMap.getINode(block);
+        fileINode = blocksMap.getINode(block);
         // abandoned block or block reopened for append
         if(fileINode == null || fileINode.isUnderConstruction()) {
           neededReplications.remove(block, priority); // remove from neededReplications
           replIndex--;
           return false;
         }
+
         requiredReplication = fileINode.getReplication();
 
         // get a source data-node
@@ -640,21 +760,34 @@
         // do not schedule more if enough replicas is already pending
         numEffectiveReplicas = numReplicas.liveReplicas() +
                                 pendingReplications.getNumReplicas(block);
-        if(numEffectiveReplicas >= requiredReplication) {
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
-          NameNode.stateChangeLog.info("BLOCK* "
-              + "Removing block " + block
-              + " from neededReplications as it has enough replicas.");
-          return false;
+      
+        if (numEffectiveReplicas >= requiredReplication) {
+          if ( (pendingReplications.getNumReplicas(block) > 0) ||
+               (blockHasEnoughRacks(block)) ) {
+            neededReplications.remove(block, priority); // remove from neededReplications
+            replIndex--;
+            NameNode.stateChangeLog.info("BLOCK* "
+                + "Removing block " + block
+                + " from neededReplications as it has enough replicas.");
+            return false;
+          }
+        }
+
+        if (numReplicas.liveReplicas() < requiredReplication) {
+          additionalReplRequired = requiredReplication - numEffectiveReplicas;
+        } else {
+          additionalReplRequired = 1; //Needed on a new rack
         }
+
       }
     }
 
     // choose replication targets: NOT HOLDING THE GLOBAL LOCK
-    DatanodeDescriptor targets[] = replicator.chooseTarget(
-        requiredReplication - numEffectiveReplicas,
-        srcNode, containingNodes, null, block.getNumBytes());
+    // It is costly to extract the filename for which chooseTargets is called,
+    // so for now we pass in the Inode itself.
+    DatanodeDescriptor targets[] = 
+                       replicator.chooseTarget(fileINode, additionalReplRequired,
+                       srcNode, containingNodes, block.getNumBytes());
     if(targets.length == 0)
       return false;
 
@@ -662,7 +795,7 @@
       synchronized (neededReplications) {
         // Recheck since global lock was released
         // block should belong to a file
-        INodeFile fileINode = blocksMap.getINode(block);
+        fileINode = blocksMap.getINode(block);
         // abandoned block or block reopened for append
         if(fileINode == null || fileINode.isUnderConstruction()) {
           neededReplications.remove(block, priority); // remove from neededReplications
@@ -675,13 +808,25 @@
         NumberReplicas numReplicas = countNodes(block);
         numEffectiveReplicas = numReplicas.liveReplicas() +
         pendingReplications.getNumReplicas(block);
-        if(numEffectiveReplicas >= requiredReplication) {
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
-          NameNode.stateChangeLog.info("BLOCK* "
-              + "Removing block " + block
-              + " from neededReplications as it has enough replicas.");
-          return false;
+
+        if (numEffectiveReplicas >= requiredReplication) {
+          if ( (pendingReplications.getNumReplicas(block) > 0) ||
+               (blockHasEnoughRacks(block)) ) {
+            neededReplications.remove(block, priority); // remove from neededReplications
+            replIndex--;
+            NameNode.stateChangeLog.info("BLOCK* "
+                + "Removing block " + block
+                + " from neededReplications as it has enough replicas.");
+            return false;
+          }
+        }
+
+        if ( (numReplicas.liveReplicas() >= requiredReplication) &&
+             (!blockHasEnoughRacks(block)) ) {
+          if (srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
+            //No use continuing, unless a new rack in this case
+            return false;
+          }
         }
 
         // Add block to the to be replicated list
@@ -803,10 +948,13 @@
       synchronized (namesystem) {
         for (int i = 0; i < timedOutItems.length; i++) {
           NumberReplicas num = countNodes(timedOutItems[i]);
-          neededReplications.add(timedOutItems[i],
-                                 num.liveReplicas(),
-                                 num.decommissionedReplicas(),
-                                 getReplication(timedOutItems[i]));
+          if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]),
+                                 num.liveReplicas())) {
+            neededReplications.add(timedOutItems[i],
+                                   num.liveReplicas(),
+                                   num.decommissionedReplicas(),
+                                   getReplication(timedOutItems[i]));
+          }
         }
       }
       /* If we know the target datanodes where the replication timedout,
@@ -828,7 +976,8 @@
     Collection<Block> toAdd = new LinkedList<Block>();
     Collection<Block> toRemove = new LinkedList<Block>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
-    node.reportDiff(blocksMap, report, toAdd, toRemove, toInvalidate);
+    Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+    node.reportDiff(this, report, toAdd, toRemove, toInvalidate, toCorrupt);
 
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
@@ -842,6 +991,9 @@
           + " does not belong to any file.");
       addToInvalidates(b, node);
     }
+    for (BlockInfo b : toCorrupt) {
+      markBlockAsCorrupt(b, node);
+    }
   }
 
   /**
@@ -851,7 +1003,8 @@
    */
   private Block addStoredBlock(final Block block,
                                DatanodeDescriptor node,
-                               DatanodeDescriptor delNodeHint) {
+                               DatanodeDescriptor delNodeHint)
+  throws IOException {
     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
     if (storedBlock == null || storedBlock.getINode() == null) {
       // If this block does not belong to anyfile, then we are done.
@@ -963,13 +1116,17 @@
     int numCurrentReplica = numLiveReplicas
       + pendingReplications.getNumReplicas(storedBlock);
 
+    if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
+        numLiveReplicas >= minReplication)
+      storedBlock = completeBlock(fileINode, storedBlock);
+
     // check whether safe replication is reached for the block
-    namesystem.incrementSafeBlockCount(numCurrentReplica);
+    // only complete blocks are counted towards that
+    if(storedBlock.isComplete())
+      namesystem.incrementSafeBlockCount(numCurrentReplica);
 
-    //
-    // if file is being actively written to, then do not check
-    // replication-factor here. It will be checked when the file is closed.
-    //
+    // if file is under construction, then check whether the block
+    // can be completed
     if (fileINode.isUnderConstruction()) {
       return storedBlock;
     }
@@ -980,7 +1137,7 @@
 
     // handle underReplication/overReplication
     short fileReplication = fileINode.getReplication();
-    if (numCurrentReplica >= fileReplication) {
+    if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
       neededReplications.remove(storedBlock, numCurrentReplica,
           num.decommissionedReplicas, fileReplication);
     } else {
@@ -1021,8 +1178,10 @@
     boolean gotException = false;
     if (nodes == null)
       return;
-    for (Iterator<DatanodeDescriptor> it = nodes.iterator(); it.hasNext(); ) {
-      DatanodeDescriptor node = it.next();
+    // make a copy of the array of nodes in order to avoid
+    // ConcurrentModificationException, when the block is removed from the node
+    DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
+    for (DatanodeDescriptor node : nodesCopy) {
       try {
         invalidateBlock(blk, node);
       } catch (IOException e) {
@@ -1058,9 +1217,11 @@
         NumberReplicas num = countNodes(block);
         int numCurrentReplica = num.liveReplicas();
         // add to under-replicated queue if need to be
-        if (neededReplications.add(block, numCurrentReplica, num
-            .decommissionedReplicas(), expectedReplication)) {
-          nrUnderReplicated++;
+        if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
+          if (neededReplications.add(block, numCurrentReplica, num
+              .decommissionedReplicas(), expectedReplication)) {
+            nrUnderReplicated++;
+          }
         }
 
         if (numCurrentReplica > expectedReplication) {
@@ -1104,7 +1265,7 @@
       }
     }
     namesystem.chooseExcessReplicates(nonExcess, block, replication, 
-        addedNode, delNodeHint);
+        addedNode, delNodeHint, replicator);
   }
 
   void addToExcessReplicate(DatanodeInfo dn, Block block) {
@@ -1192,7 +1353,30 @@
     // Modify the blocks->datanode map and node's map.
     //
     pendingReplications.remove(block);
-    addStoredBlock(block, node, delHintNode);
+
+    // blockReceived reports a finalized block
+    Collection<Block> toAdd = new LinkedList<Block>();
+    Collection<Block> toInvalidate = new LinkedList<Block>();
+    Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+    node.processReportedBlock(this, block, ReplicaState.FINALIZED,
+                              toAdd, toInvalidate, toCorrupt);
+    // the block is only in one of the lists
+    // if it is in none then data-node already has it
+    assert toAdd.size() + toInvalidate.size() <= 1 :
+      "The block should be only in one of the lists.";
+
+    for (Block b : toAdd) {
+      addStoredBlock(b, node, delHintNode);
+    }
+    for (Block b : toInvalidate) {
+      NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
+          + b + " on " + node.getName() + " size " + b.getNumBytes()
+          + " does not belong to any file.");
+      addToInvalidates(b, node);
+    }
+    for (BlockInfo b : toCorrupt) {
+      markBlockAsCorrupt(b, node);
+    }
   }
 
   /**
@@ -1224,12 +1408,38 @@
     return new NumberReplicas(live, count, corrupt, excess);
   }
 
+  private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
+      NumberReplicas num) {
+    int curReplicas = num.liveReplicas();
+    int curExpectedReplicas = getReplication(block);
+    INode fileINode = blocksMap.getINode(block);
+    Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(block);
+    StringBuffer nodeList = new StringBuffer();
+    while (nodeIter.hasNext()) {
+      DatanodeDescriptor node = nodeIter.next();
+      nodeList.append(node.name);
+      nodeList.append(" ");
+    }
+    FSNamesystem.LOG.info("Block: " + block + ", Expected Replicas: "
+        + curExpectedReplicas + ", live replicas: " + curReplicas
+        + ", corrupt replicas: " + num.corruptReplicas()
+        + ", decommissioned replicas: " + num.decommissionedReplicas()
+        + ", excess replicas: " + num.excessReplicas()
+        + ", Is Open File: " + fileINode.isUnderConstruction()
+        + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+        + srcNode.name + ", Is current datanode decommissioning: "
+        + srcNode.isDecommissionInProgress());
+  }
+  
   /**
    * Return true if there are any blocks on this node that have not
    * yet reached their replication factor. Otherwise returns false.
    */
   boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
     boolean status = false;
+    int underReplicatedBlocks = 0;
+    int decommissionOnlyReplicas = 0;
+    int underReplicatedInOpenFiles = 0;
     final Iterator<? extends Block> it = srcNode.getBlockIterator();
     while(it.hasNext()) {
       final Block block = it.next();
@@ -1239,14 +1449,27 @@
         NumberReplicas num = countNodes(block);
         int curReplicas = num.liveReplicas();
         int curExpectedReplicas = getReplication(block);
-        if (curExpectedReplicas > curReplicas) {
-          status = true;
+        if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
+          if (curExpectedReplicas > curReplicas) {
+            //Log info about one block for this node which needs replication
+            if (!status) {
+              status = true;
+              logBlockReplicationInfo(block, srcNode, num);
+            }
+            underReplicatedBlocks++;
+            if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
+              decommissionOnlyReplicas++;
+            }
+            if (fileINode.isUnderConstruction()) {
+              underReplicatedInOpenFiles++;
+            }
+          }
           if (!neededReplications.contains(block) &&
             pendingReplications.getNumReplicas(block) == 0) {
             //
             // These blocks have been reported from the datanode
             // after the startDecommission method has been executed. These
-            // blocks were in flight when the decommission was started.
+            // blocks were in flight when the decommissioning was started.
             //
             neededReplications.add(block,
                                    curReplicas,
@@ -1256,6 +1479,9 @@
         }
       }
     }
+    srcNode.decommissioningStatus.set(underReplicatedBlocks,
+        decommissionOnlyReplicas, 
+        underReplicatedInOpenFiles);
     return status;
   }
 
@@ -1293,16 +1519,23 @@
     synchronized (namesystem) {
       NumberReplicas repl = countNodes(block);
       int curExpectedReplicas = getReplication(block);
-      neededReplications.update(block, repl.liveReplicas(), repl
-          .decommissionedReplicas(), curExpectedReplicas, curReplicasDelta,
-          expectedReplicasDelta);
+      if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
+        neededReplications.update(block, repl.liveReplicas(), repl
+            .decommissionedReplicas(), curExpectedReplicas, curReplicasDelta,
+            expectedReplicasDelta);
+      } else {
+        int oldReplicas = repl.liveReplicas()-curReplicasDelta;
+        int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
+        neededReplications.remove(block, oldReplicas, repl.decommissionedReplicas(),
+                                  oldExpectedReplicas);
+      }
     }
   }
 
   void checkReplication(Block block, int numExpectedReplicas) {
     // filter out containingNodes that are marked for decommission.
     NumberReplicas number = countNodes(block);
-    if (number.liveReplicas() < numExpectedReplicas) {
+    if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) { 
       neededReplications.add(block,
                              number.liveReplicas(),
                              number.decommissionedReplicas,
@@ -1384,13 +1617,74 @@
       return blocksToInvalidate.size();
     }
   }
+  
+  //Returns the number of racks over which a given block is replicated
+  //decommissioning/decommissioned nodes are not counted. corrupt replicas 
+  //are also ignored
+  int getNumberOfRacks(Block b) {
+    HashSet<String> rackSet = new HashSet<String>(0);
+    Collection<DatanodeDescriptor> corruptNodes = 
+                                  corruptReplicas.getNodes(b);
+    for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); 
+         it.hasNext();) {
+      DatanodeDescriptor cur = it.next();
+      if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+        if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
+          String rackName = cur.getNetworkLocation();
+          if (!rackSet.contains(rackName)) {
+            rackSet.add(rackName);
+          }
+        }
+      }
+    }
+    return rackSet.size();
+  }
+
+  boolean blockHasEnoughRacks(Block b) {
+    if (!this.shouldCheckForEnoughRacks) {
+      return true;
+    }
+    boolean enoughRacks = false;;
+    Collection<DatanodeDescriptor> corruptNodes = 
+                                  corruptReplicas.getNodes(b);
+    int numExpectedReplicas = getReplication(b);
+    String rackName = null;
+    for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); 
+         it.hasNext();) {
+      DatanodeDescriptor cur = it.next();
+      if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+        if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
+          if (numExpectedReplicas == 1) {
+            enoughRacks = true;
+            break;
+          }
+          String rackNameNew = cur.getNetworkLocation();
+          if (rackName == null) {
+            rackName = rackNameNew;
+          } else if (!rackName.equals(rackNameNew)) {
+            enoughRacks = true;
+            break;
+          }
+        }
+      }
+    }
+    return enoughRacks;
+  }
 
+  boolean isNeededReplication(Block b, int expectedReplication, int curReplicas) {
+    if ((curReplicas >= expectedReplication) && (blockHasEnoughRacks(b))) {
+      return false;
+    } else {
+      return true;
+    }
+  }
+  
   long getMissingBlocksCount() {
     // not locking
     return Math.max(missingBlocksInPrevIter, missingBlocksInCurIter);
   }
 
-  BlockInfo addINode(Block block, INodeFile iNode) {
+  BlockInfo addINode(BlockInfo block, INodeFile iNode) {
     return blocksMap.addINode(block, iNode);
   }
 
@@ -1409,7 +1703,7 @@
   void removeBlockFromMap(Block block) {
     blocksMap.removeBlock(block);
   }
-  
+
   int getCapacity() {
     synchronized(namesystem) {
       return blocksMap.getCapacity();
@@ -1419,4 +1713,26 @@
   float getLoadFactor() {
     return blocksMap.getLoadFactor();
   }
+  
+  
+  /**
+   * Return a range of corrupt replica block ids. Up to numExpectedBlocks 
+   * blocks starting at the next block after startingBlockId are returned
+   * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId 
+   * is null, up to numExpectedBlocks blocks are returned from the beginning.
+   * If startingBlockId cannot be found, null is returned.
+   *
+   * @param numExpectedBlocks Number of block ids to return.
+   *  0 <= numExpectedBlocks <= 100
+   * @param startingBlockId Block id from which to start. If null, start at
+   *  beginning.
+   * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
+   *
+   */
+  long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
+                                   Long startingBlockId) {
+    return corruptReplicas.getCorruptReplicaBlockIds(numExpectedBlocks,
+                                                     startingBlockId);
+  }  
+  
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java Sat Nov 28 20:05:56 2009
@@ -75,11 +75,10 @@
   /**
    * Add block b belonging to the specified file inode to the map.
    */
-  BlockInfo addINode(Block b, INodeFile iNode) {
-    int replication = iNode.getReplication();
+  BlockInfo addINode(BlockInfo b, INodeFile iNode) {
     BlockInfo info = map.get(b);
-    if (info == null) {
-      info = new BlockInfo(b, replication);
+    if (info != b) {
+      info = b;
       map.put(info, info);
     }
     info.setINode(iNode);
@@ -191,4 +190,23 @@
   float getLoadFactor() {
     return loadFactor;
   }
+
+  /**
+   * Replace a block in the block map by a new block.
+   * The new block and the old one have the same key.
+   * @param newBlock - block for replacement
+   * @return new block
+   */
+  BlockInfo replaceBlock(BlockInfo newBlock) {
+    BlockInfo currentBlock = map.get(newBlock);
+    assert currentBlock != null : "the block if not in blocksMap";
+    // replace block in data-node lists
+    for(int idx = currentBlock.numNodes()-1; idx >= 0; idx--) {
+      DatanodeDescriptor dn = currentBlock.getDatanode(idx);
+      dn.replaceBlock(currentBlock, newBlock);
+    }
+    // replace block in the map itself
+    map.put(newBlock, newBlock);
+    return newBlock;
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Sat Nov 28 20:05:56 2009
@@ -34,6 +34,7 @@
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpServer;
 
@@ -86,8 +87,10 @@
     shouldRun = true;
 
     // Initialize other scheduling parameters from the configuration
-    checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);
-    checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
+    checkpointPeriod = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 
+                                    DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
+    checkpointSize = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_SIZE_KEY, 
+                                  DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_SIZE_DEFAULT);
 
     HttpServer httpServer = backupNode.httpServer;
     httpServer.setAttribute("name.system.image", getFSImage());

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/CorruptReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/CorruptReplicasMap.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/CorruptReplicasMap.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/CorruptReplicasMap.java Sat Nov 28 20:05:56 2009
@@ -33,7 +33,7 @@
 
 public class CorruptReplicasMap{
 
-  private Map<Block, Collection<DatanodeDescriptor>> corruptReplicasMap =
+  private SortedMap<Block, Collection<DatanodeDescriptor>> corruptReplicasMap =
     new TreeMap<Block, Collection<DatanodeDescriptor>>();
   
   /**
@@ -126,4 +126,59 @@
   public int size() {
     return corruptReplicasMap.size();
   }
+
+  /**
+   * Return a range of corrupt replica block ids. Up to numExpectedBlocks 
+   * blocks starting at the next block after startingBlockId are returned
+   * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId 
+   * is null, up to numExpectedBlocks blocks are returned from the beginning.
+   * If startingBlockId cannot be found, null is returned.
+   *
+   * @param numExpectedBlocks Number of block ids to return.
+   *  0 <= numExpectedBlocks <= 100
+   * @param startingBlockId Block id from which to start. If null, start at
+   *  beginning.
+   * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
+   *
+   */
+  long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
+                                   Long startingBlockId) {
+    if (numExpectedBlocks < 0 || numExpectedBlocks > 100) {
+      return null;
+    }
+    
+    Iterator<Block> blockIt = corruptReplicasMap.keySet().iterator();
+    
+    // if the starting block id was specified, iterate over keys until
+    // we find the matching block. If we find a matching block, break
+    // to leave the iterator on the next block after the specified block. 
+    if (startingBlockId != null) {
+      boolean isBlockFound = false;
+      while (blockIt.hasNext()) {
+        Block b = blockIt.next();
+        if (b.getBlockId() == startingBlockId) {
+          isBlockFound = true;
+          break; 
+        }
+      }
+      
+      if (!isBlockFound) {
+        return null;
+      }
+    }
+
+    ArrayList<Long> corruptReplicaBlockIds = new ArrayList<Long>();
+
+    // append up to numExpectedBlocks blockIds to our list
+    for(int i=0; i<numExpectedBlocks && blockIt.hasNext(); i++) {
+      corruptReplicaBlockIds.add(blockIt.next().getBlockId());
+    }
+    
+    long[] ret = new long[corruptReplicaBlockIds.size()];
+    for(int i=0; i<ret.length; i++) {
+      ret[i] = corruptReplicaBlockIds.get(i);
+    }
+    
+    return ret;
+  }  
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Sat Nov 28 20:05:56 2009
@@ -25,7 +25,11 @@
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
@@ -43,6 +47,11 @@
 
  **************************************************/
 public class DatanodeDescriptor extends DatanodeInfo {
+  
+  // Stores status of decommissioning.
+  // If node is not decommissioning, do not use this object for anything.
+  DecommissioningStatus decommissioningStatus = new DecommissioningStatus();
+  
   /** Block and targets pair */
   public static class BlockTargetPair {
     public final Block block;
@@ -55,29 +64,36 @@
   }
 
   /** A BlockTargetPair queue. */
-  private static class BlockQueue {
-    private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
+  private static class BlockQueue<E> {
+    private final Queue<E> blockq = new LinkedList<E>();
 
     /** Size of the queue */
     synchronized int size() {return blockq.size();}
 
     /** Enqueue */
-    synchronized boolean offer(Block block, DatanodeDescriptor[] targets) { 
-      return blockq.offer(new BlockTargetPair(block, targets));
+    synchronized boolean offer(E e) { 
+      return blockq.offer(e);
     }
 
     /** Dequeue */
-    synchronized List<BlockTargetPair> poll(int numBlocks) {
+    synchronized List<E> poll(int numBlocks) {
       if (numBlocks <= 0 || blockq.isEmpty()) {
         return null;
       }
 
-      List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
+      List<E> results = new ArrayList<E>();
       for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) {
         results.add(blockq.poll());
       }
       return results;
     }
+
+    /**
+     * Returns <tt>true</tt> if the queue contains the specified element.
+     */
+    boolean contains(E e) {
+      return blockq.contains(e);
+    }
   }
 
   private volatile BlockInfo blockList = null;
@@ -87,9 +103,10 @@
   protected boolean needKeyUpdate = false;
 
   /** A queue of blocks to be replicated by this datanode */
-  private BlockQueue replicateBlocks = new BlockQueue();
+  private BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
   /** A queue of blocks to be recovered by this datanode */
-  private BlockQueue recoverBlocks = new BlockQueue();
+  private BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
+                                new BlockQueue<BlockInfoUnderConstruction>();
   /** A set of blocks to be invalidated by this datanode */
   private Set<Block> invalidateBlocks = new TreeSet<Block>();
 
@@ -201,6 +218,21 @@
     blockList = b.listInsert(blockList, this);
   }
 
+  /**
+   * Replace specified old block with a new one in the DataNodeDescriptor.
+   * 
+   * @param oldBlock - block to be replaced
+   * @param newBlock - a replacement block
+   * @return the new block
+   */
+  BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) {
+    boolean done = removeBlock(oldBlock);
+    assert done : "Old block should belong to the data-node when replacing";
+    done = addBlock(newBlock);
+    assert done : "New block should not belong to the data-node when replacing";
+    return newBlock;
+  }
+
   void resetBlocks() {
     this.capacity = 0;
     this.remaining = 0;
@@ -262,15 +294,20 @@
    */
   void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
     assert(block != null && targets != null && targets.length > 0);
-    replicateBlocks.offer(block, targets);
+    replicateBlocks.offer(new BlockTargetPair(block, targets));
   }
 
   /**
    * Store block recovery work.
    */
-  void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
-    assert(block != null && targets != null && targets.length > 0);
-    recoverBlocks.offer(block, targets);
+  void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
+    if(recoverBlocks.contains(block)) {
+      // this prevents adding the same block twice to the recovery queue
+      FSNamesystem.LOG.info("Block " + block +
+                            " is already in the recovery queue.");
+      return;
+    }
+    recoverBlocks.offer(block);
   }
 
   /**
@@ -308,10 +345,16 @@
         new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
   }
 
-  BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
-    List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
-    return blocktargetlist == null? null:
-        new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
+  BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) {
+    List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
+    if(blocks == null)
+      return null;
+    BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.size());
+    for(BlockInfoUnderConstruction b : blocks) {
+      brCommand.add(new RecoveringBlock(
+          b, b.getExpectedLocations(), b.getBlockRecoveryId()));
+    }
+    return brCommand;
   }
 
   /**
@@ -361,44 +404,29 @@
     return blockarray;
   }
 
-  void reportDiff(BlocksMap blocksMap,
+  void reportDiff(BlockManager blockManager,
                   BlockListAsLongs newReport,
                   Collection<Block> toAdd,    // add to DatanodeDescriptor
                   Collection<Block> toRemove, // remove from DatanodeDescriptor
-                  Collection<Block> toInvalidate) { // should be removed from DN
-    // place a deilimiter in the list which separates blocks 
+                  Collection<Block> toInvalidate, // should be removed from DN
+                  Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
+    // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), 1);
     boolean added = this.addBlock(delimiter);
     assert added : "Delimiting block cannot be present in the node";
     if(newReport == null)
-      newReport = new BlockListAsLongs( new long[0]);
-    // scan the report and collect newly reported blocks
-    // Note we are taking special precaution to limit tmp blocks allocated
-    // as part this block report - which why block list is stored as longs
-    Block iblk = new Block(); // a fixed new'ed block to be reused with index i
-    for (int i = 0; i < newReport.getNumberOfBlocks(); ++i) {
-      iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i), 
-               newReport.getBlockGenStamp(i));
-      BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
-      if(storedBlock == null) {
-        // If block is not in blocksMap it does not belong to any file
-        toInvalidate.add(new Block(iblk));
-        continue;
-      }
-      if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN
-        // if the size differs from what is in the blockmap, then return
-        // the new block. addStoredBlock will then pick up the right size of this
-        // block and will update the block object in the BlocksMap
-        if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
-          toAdd.add(new Block(iblk));
-        } else {
-          toAdd.add(storedBlock);
-        }
-        continue;
-      }
+      newReport = new BlockListAsLongs();
+    // scan the report and process newly reported blocks
+    BlockReportIterator itBR = newReport.getBlockReportIterator();
+    while(itBR.hasNext()) {
+      Block iblk = itBR.next();
+      ReplicaState iState = itBR.getCurrentReplicaState();
+      BlockInfo storedBlock = processReportedBlock(blockManager, iblk, iState,
+                                               toAdd, toInvalidate, toCorrupt);
       // move block to the head of the list
-      this.moveBlockToHead(storedBlock);
+      if(storedBlock != null && storedBlock.findDatanode(this) >= 0)
+        this.moveBlockToHead(storedBlock);
     }
     // collect blocks that have not been reported
     // all of them are next to the delimiter
@@ -408,6 +436,97 @@
     this.removeBlock(delimiter);
   }
 
+  /**
+   * Process a block replica reported by the data-node.
+   * 
+   * <ol>
+   * <li>If the block is not known to the system (not in blocksMap) then the
+   * data-node should be notified to invalidate this block.</li>
+   * <li>If the reported replica is valid that is has the same generation stamp
+   * and length as recorded on the name-node, then the replica location is
+   * added to the name-node.</li>
+   * <li>If the reported replica is not valid, then it is marked as corrupt,
+   * which triggers replication of the existing valid replicas.
+   * Corrupt replicas are removed from the system when the block
+   * is fully replicated.</li>
+   * </ol>
+   * 
+   * @param blockManager
+   * @param block reported block replica
+   * @param rState reported replica state
+   * @param toAdd add to DatanodeDescriptor
+   * @param toInvalidate missing blocks (not in the blocks map)
+   *        should be removed from the data-node
+   * @param toCorrupt replicas with unexpected length or generation stamp;
+   *        add to corrupt replicas
+   * @return
+   */
+  BlockInfo processReportedBlock(
+                  BlockManager blockManager,
+                  Block block,                // reported block replica
+                  ReplicaState rState,        // reported replica state
+                  Collection<Block> toAdd,    // add to DatanodeDescriptor
+                  Collection<Block> toInvalidate, // should be removed from DN
+                  Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
+    FSNamesystem.LOG.debug("Reported block " + block
+        + " on " + getName() + " size " + block.getNumBytes()
+        + " replicaState = " + rState);
+
+    // find block by blockId
+    BlockInfo storedBlock = blockManager.blocksMap.getStoredBlock(block);
+    if(storedBlock == null) {
+      // If blocksMap does not contain reported block id,
+      // the replica should be removed from the data-node.
+      toInvalidate.add(new Block(block));
+      return null;
+    }
+
+    FSNamesystem.LOG.debug("In memory blockUCState = " + storedBlock.getBlockUCState());
+
+    // Block is on the DN
+    boolean isCorrupt = false;
+    switch(rState) {
+    case FINALIZED:
+      switch(storedBlock.getBlockUCState()) {
+      case COMPLETE:
+      case COMMITTED:
+        if(storedBlock.getGenerationStamp() != block.getGenerationStamp()
+            || storedBlock.getNumBytes() != block.getNumBytes())
+          isCorrupt = true;
+        break;
+      case UNDER_CONSTRUCTION:
+      case UNDER_RECOVERY:
+        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+            this, block, rState);
+      }
+      if(!isCorrupt && storedBlock.findDatanode(this) < 0)
+        if (storedBlock.getNumBytes() != block.getNumBytes()) {
+          toAdd.add(new Block(block));
+        } else {
+          toAdd.add(storedBlock);
+        }
+      break;
+    case RBW:
+    case RWR:
+      if(!storedBlock.isComplete())
+        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+                                                      this, block, rState);
+      else
+        isCorrupt = true;
+      break;
+    case RUR:       // should not be reported
+    case TEMPORARY: // should not be reported
+    default:
+      FSNamesystem.LOG.warn("Unexpected replica state " + rState
+          + " for block: " + storedBlock + 
+          " on " + getName() + " size " + storedBlock.getNumBytes());
+      break;
+    }
+    if(isCorrupt)
+        toCorrupt.add(storedBlock);
+    return storedBlock;
+  }
+
   /** Serialization for FSEditLog */
   void readFieldsFromFSEditLog(DataInput in) throws IOException {
     this.name = DeprecatedUTF8.readString(in);
@@ -475,4 +594,53 @@
     // by DatanodeID
     return (this == obj) || super.equals(obj);
   }
+  
+  class DecommissioningStatus {
+    int underReplicatedBlocks;
+    int decommissionOnlyReplicas;
+    int underReplicatedInOpenFiles;
+    long startTime;
+    
+    synchronized void set(int underRep,
+        int onlyRep, int underConstruction) {
+      if (isDecommissionInProgress() == false) {
+        return;
+      }
+      underReplicatedBlocks = underRep;
+      decommissionOnlyReplicas = onlyRep;
+      underReplicatedInOpenFiles = underConstruction;
+    }
+    
+    synchronized int getUnderReplicatedBlocks() {
+      if (isDecommissionInProgress() == false) {
+        return 0;
+      }
+      return underReplicatedBlocks;
+    }
+    synchronized int getDecommissionOnlyReplicas() {
+      if (isDecommissionInProgress() == false) {
+        return 0;
+      }
+      return decommissionOnlyReplicas;
+    }
+
+    synchronized int getUnderReplicatedInOpenFiles() {
+      if (isDecommissionInProgress() == false) {
+        return 0;
+      }
+      return underReplicatedInOpenFiles;
+    }
+
+    synchronized void setStartTime(long time) {
+      startTime = time;
+    }
+    
+    synchronized long getStartTime() {
+      if (isDecommissionInProgress() == false) {
+        return 0;
+      }
+      return startTime;
+    }
+  }  // End of class DecommissioningStatus
+  
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java Sat Nov 28 20:05:56 2009
@@ -35,6 +35,7 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -66,7 +67,7 @@
       ) throws IOException {
     ServletContext context = getServletContext();
     InetSocketAddress nnAddr = (InetSocketAddress)context.getAttribute("name.node.address");
-    Configuration conf = new Configuration(
+    Configuration conf = new HdfsConfiguration(
         (Configuration)context.getAttribute("name.conf"));
     UnixUserGroupInformation.saveToConf(conf,
         UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Sat Nov 28 20:05:56 2009
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -79,7 +80,7 @@
     try {
       this.backupNode =
         (NamenodeProtocol) RPC.getProxy(NamenodeProtocol.class,
-            NamenodeProtocol.versionID, bnAddress, new Configuration());
+            NamenodeProtocol.versionID, bnAddress, new HdfsConfiguration());
     } catch(IOException e) {
       Storage.LOG.error("Error connecting to: " + bnAddress, e);
       throw e;