You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/08/28 08:30:15 UTC

svn commit: r1518087 - in /hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/ src...

Author: szetszwo
Date: Wed Aug 28 06:30:15 2013
New Revision: 1518087

URL: http://svn.apache.org/r1518087
Log:
HDFS-4987. Namenode changes to track multiple storages per datanode.

Added:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
Removed:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
Modified:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java

Added: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt?rev=1518087&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt (added)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt Wed Aug 28 06:30:15 2013
@@ -0,0 +1,4 @@
+  BREAKDOWN OF HDFS-2832 ENABLE SUPPORT FOR HETEROGENEOUS STORAGES IN HDFS
+
+    HDFS-4987. Namenode changes to track multiple storages per datanode.
+    (szetszwo)

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1518087&r1=1518086&r2=1518087&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Wed Aug 28 06:30:15 2013
@@ -21,6 +21,7 @@ import java.util.LinkedList;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.util.LightWeightGSet;
 
@@ -39,11 +40,11 @@ public class BlockInfo extends Block imp
   private LightWeightGSet.LinkedElement nextLinkedElement;
 
   /**
-   * This array contains triplets of references. For each i-th datanode the
-   * block belongs to triplets[3*i] is the reference to the DatanodeDescriptor
-   * and triplets[3*i+1] and triplets[3*i+2] are references to the previous and
-   * the next blocks, respectively, in the list of blocks belonging to this
-   * data-node.
+   * This array contains triplets of references. For each i-th storage, the
+   * block belongs to triplets[3*i] is the reference to the
+   * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
+   * references to the previous and the next blocks, respectively, in the list
+   * of blocks belonging to this storage.
    * 
    * Using previous and next in Object triplets is done instead of a
    * {@link LinkedList} list to efficiently use memory. With LinkedList the cost
@@ -86,9 +87,15 @@ public class BlockInfo extends Block imp
   }
 
   DatanodeDescriptor getDatanode(int index) {
+    DatanodeStorageInfo storage = getStorageInfo(index);
+    return storage == null ? null : storage.getDatanodeDescriptor();
+  }
+
+  DatanodeStorageInfo getStorageInfo(int index) {
     assert this.triplets != null : "BlockInfo is not initialized";
     assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
-    return (DatanodeDescriptor)triplets[index*3];
+    DatanodeStorageInfo storage = (DatanodeStorageInfo)triplets[index*3];
+    return storage;
   }
 
   private BlockInfo getPrevious(int index) {
@@ -111,14 +118,10 @@ public class BlockInfo extends Block imp
     return info;
   }
 
-  private void setDatanode(int index, DatanodeDescriptor node, BlockInfo previous,
-      BlockInfo next) {
+  void setStorageInfo(int index, DatanodeStorageInfo storage) {
     assert this.triplets != null : "BlockInfo is not initialized";
-    int i = index * 3;
-    assert index >= 0 && i+2 < triplets.length : "Index is out of bound";
-    triplets[i] = node;
-    triplets[i+1] = previous;
-    triplets[i+2] = next;
+    assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
+    triplets[index*3] = storage;
   }
 
   /**
@@ -190,22 +193,24 @@ public class BlockInfo extends Block imp
   }
 
   /**
-   * Add data-node this block belongs to.
+   * Add a {@link DatanodeStorageInfo} location for a block
    */
-  public boolean addNode(DatanodeDescriptor node) {
-    if(findDatanode(node) >= 0) // the node is already there
+  boolean addStorage(DatanodeStorageInfo storage) {
+    if(findStorageInfo(storage) >= 0) // the node is already there
       return false;
     // find the last null node
     int lastNode = ensureCapacity(1);
-    setDatanode(lastNode, node, null, null);
+    setStorageInfo(lastNode, storage);
+    setNext(lastNode, null);
+    setPrevious(lastNode, null);
     return true;
   }
 
   /**
-   * Remove data-node from the block.
+   * Remove {@link DatanodeStorageInfo} location for a block
    */
-  public boolean removeNode(DatanodeDescriptor node) {
-    int dnIndex = findDatanode(node);
+  boolean removeStorage(DatanodeStorageInfo storage) {
+    int dnIndex = findStorageInfo(storage);
     if(dnIndex < 0) // the node is not found
       return false;
     assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : 
@@ -213,10 +218,13 @@ public class BlockInfo extends Block imp
     // find the last not null node
     int lastNode = numNodes()-1; 
     // replace current node triplet by the lastNode one 
-    setDatanode(dnIndex, getDatanode(lastNode), getPrevious(lastNode),
-        getNext(lastNode));
+    setStorageInfo(dnIndex, getStorageInfo(lastNode));
+    setNext(dnIndex, getNext(lastNode)); 
+    setPrevious(dnIndex, getPrevious(lastNode)); 
     // set the last triplet to null
-    setDatanode(lastNode, null, null, null);
+    setStorageInfo(lastNode, null);
+    setNext(lastNode, null); 
+    setPrevious(lastNode, null); 
     return true;
   }
 
@@ -236,37 +244,70 @@ public class BlockInfo extends Block imp
     }
     return -1;
   }
+  /**
+   * Find specified DatanodeStorageInfo.
+   * @param dn
+   * @return index or -1 if not found.
+   */
+  int findStorageInfo(DatanodeInfo dn) {
+    int len = getCapacity();
+    for(int idx = 0; idx < len; idx++) {
+      DatanodeStorageInfo cur = getStorageInfo(idx);
+      if(cur == null)
+        break;
+      if(cur.getDatanodeDescriptor() == dn)
+        return idx;
+    }
+    return -1;
+  }
+  
+  /**
+   * Find specified DatanodeStorageInfo.
+   * @param storageInfo
+   * @return index or -1 if not found.
+   */
+  int findStorageInfo(DatanodeStorageInfo storageInfo) {
+    int len = getCapacity();
+    for(int idx = 0; idx < len; idx++) {
+      DatanodeStorageInfo cur = getStorageInfo(idx);
+      if(cur == storageInfo)
+        return idx;
+      if(cur == null)
+        break;
+    }
+    return -1;
+  }
 
   /**
    * Insert this block into the head of the list of blocks 
-   * related to the specified DatanodeDescriptor.
+   * related to the specified DatanodeStorageInfo.
    * If the head is null then form a new list.
    * @return current block as the new head of the list.
    */
-  public BlockInfo listInsert(BlockInfo head, DatanodeDescriptor dn) {
-    int dnIndex = this.findDatanode(dn);
+  BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
+    int dnIndex = this.findStorageInfo(storage);
     assert dnIndex >= 0 : "Data node is not found: current";
     assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : 
             "Block is already in the list and cannot be inserted.";
     this.setPrevious(dnIndex, null);
     this.setNext(dnIndex, head);
     if(head != null)
-      head.setPrevious(head.findDatanode(dn), this);
+      head.setPrevious(head.findStorageInfo(storage), this);
     return this;
   }
 
   /**
    * Remove this block from the list of blocks 
-   * related to the specified DatanodeDescriptor.
+   * related to the specified DatanodeStorageInfo.
    * If this block is the head of the list then return the next block as 
    * the new head.
    * @return the new head of the list or null if the list becomes
-   * empty after deletion.
+   * empy after deletion.
    */
-  public BlockInfo listRemove(BlockInfo head, DatanodeDescriptor dn) {
+  BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
     if(head == null)
       return null;
-    int dnIndex = this.findDatanode(dn);
+    int dnIndex = this.findStorageInfo(storage);
     if(dnIndex < 0) // this block is not on the data-node list
       return head;
 
@@ -275,33 +316,20 @@ public class BlockInfo extends Block imp
     this.setNext(dnIndex, null);
     this.setPrevious(dnIndex, null);
     if(prev != null)
-      prev.setNext(prev.findDatanode(dn), next);
+      prev.setNext(prev.findStorageInfo(storage), next);
     if(next != null)
-      next.setPrevious(next.findDatanode(dn), prev);
+      next.setPrevious(next.findStorageInfo(storage), prev);
     if(this == head)  // removing the head
       head = next;
     return head;
   }
 
-  /**
-   * Remove this block from the list of blocks related to the specified
-   * DatanodeDescriptor. Insert it into the head of the list of blocks.
-   *
-   * @return the new head of the list.
-   */
-  public BlockInfo moveBlockToHead(BlockInfo head, DatanodeDescriptor dn,
-      int curIndex, int headIndex) {
-    if (head == this) {
-      return this;
-    }
-    BlockInfo next = this.setNext(curIndex, head);
-    BlockInfo prev = this.setPrevious(curIndex, null);
-
-    head.setPrevious(headIndex, this);
-    prev.setNext(prev.findDatanode(dn), next);
-    if (next != null)
-      next.setPrevious(next.findDatanode(dn), prev);
-    return this;
+  int listCount(DatanodeStorageInfo storage) {
+    int count = 0;
+    for(BlockInfo cur = this; cur != null;
+          cur = cur.getNext(cur.findStorageInfo(storage)))
+      count++;
+    return count;
   }
 
   /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1518087&r1=1518086&r2=1518087&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Aug 28 06:30:15 2013
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -68,8 +69,10 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -1034,7 +1037,7 @@ public class BlockManager {
    * for logging purposes
    */
   public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
-      final DatanodeInfo dn, String reason) throws IOException {
+      final DatanodeInfo dn, String storageID, String reason) throws IOException {
     assert namesystem.hasWriteLock();
     final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
     if (storedBlock == null) {
@@ -1046,11 +1049,11 @@ public class BlockManager {
           + blk + " not found");
       return;
     }
-    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn);
+    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn, storageID);
   }
 
   private void markBlockAsCorrupt(BlockToMarkCorrupt b,
-                                  DatanodeInfo dn) throws IOException {
+      DatanodeInfo dn, String storageID) throws IOException {
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
       throw new IOException("Cannot mark " + b
@@ -1066,7 +1069,7 @@ public class BlockManager {
     } 
 
     // Add replica to the data-node if it is not already there
-    node.addBlock(b.stored);
+    node.addBlock(storageID, b.stored);
 
     // Add this replica to corruptReplicas Map
     corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason);
@@ -1601,10 +1604,11 @@ public class BlockManager {
   }
 
   /**
-   * The given datanode is reporting all its blocks.
-   * Update the (machine-->blocklist) and (block-->machinelist) maps.
+   * The given storage is reporting all its blocks.
+   * Update the (storage-->block list) and (block-->storage list) maps.
    */
-  public void processReport(final DatanodeID nodeID, final String poolId,
+  public void processReport(final DatanodeID nodeID,
+      final DatanodeStorage storage, final String poolId,
       final BlockListAsLongs newReport) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.now(); //after acquiring write lock
@@ -1628,9 +1632,9 @@ public class BlockManager {
       if (node.numBlocks() == 0) {
         // The first block report can be processed a lot more efficiently than
         // ordinary block reports.  This shortens restart times.
-        processFirstBlockReport(node, newReport);
+        processFirstBlockReport(node, storage.getStorageID(), newReport);
       } else {
-        processReport(node, newReport);
+        processReport(node, storage, newReport);
       }
       
       // Now that we have an up-to-date block report, we know that any
@@ -1691,28 +1695,31 @@ public class BlockManager {
   }
   
   private void processReport(final DatanodeDescriptor node,
+      final DatanodeStorage storage,
       final BlockListAsLongs report) throws IOException {
     // Normal case:
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
     //
     Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
-    Collection<Block> toRemove = new LinkedList<Block>();
+    Collection<Block> toRemove = new TreeSet<Block>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
+    reportDiff(node, storage, report,
+        toAdd, toRemove, toInvalidate, toCorrupt, toUC);
 
     // Process the blocks on each queue
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+      addStoredBlockUnderConstruction(b.storedBlock, node,
+          storage.getStorageID(), b.reportedState);
     }
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
     }
     int numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog);
+      addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -1726,7 +1733,7 @@ public class BlockManager {
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node);
+      markBlockAsCorrupt(b, node, storage.getStorageID());
     }
   }
 
@@ -1742,6 +1749,7 @@ public class BlockManager {
    * @throws IOException 
    */
   private void processFirstBlockReport(final DatanodeDescriptor node,
+      final String storageID,
       final BlockListAsLongs report) throws IOException {
     if (report == null) return;
     assert (namesystem.hasWriteLock());
@@ -1754,7 +1762,7 @@ public class BlockManager {
       
       if (shouldPostponeBlocksFromFuture &&
           namesystem.isGenStampInFuture(iblk)) {
-        queueReportedBlock(node, iblk, reportedState,
+        queueReportedBlock(node, storageID, iblk, reportedState,
             QUEUE_REASON_FUTURE_GENSTAMP);
         continue;
       }
@@ -1771,10 +1779,10 @@ public class BlockManager {
         if (shouldPostponeBlocksFromFuture) {
           // In the Standby, we may receive a block report for a file that we
           // just have an out-of-date gen-stamp or state for, for example.
-          queueReportedBlock(node, iblk, reportedState,
+          queueReportedBlock(node, storageID, iblk, reportedState,
               QUEUE_REASON_CORRUPT_STATE);
         } else {
-          markBlockAsCorrupt(c, node);
+          markBlockAsCorrupt(c, node, storageID);
         }
         continue;
       }
@@ -1787,25 +1795,26 @@ public class BlockManager {
       }      
       //add replica if appropriate
       if (reportedState == ReplicaState.FINALIZED) {
-        addStoredBlockImmediate(storedBlock, node);
+        addStoredBlockImmediate(storedBlock, node, storageID);
       }
     }
   }
 
-  private void reportDiff(DatanodeDescriptor dn, 
+  private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, 
       BlockListAsLongs newReport, 
       Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
       Collection<Block> toRemove,           // remove from DatanodeDescriptor
       Collection<Block> toInvalidate,       // should be removed from DN
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
-    // place a delimiter in the list which separates blocks 
-    // that have been reported from those that have not
-    BlockInfo delimiter = new BlockInfo(new Block(), 1);
-    boolean added = dn.addBlock(delimiter);
-    assert added : "Delimiting block cannot be present in the node";
-    int headIndex = 0; //currently the delimiter is in the head of the list
-    int curIndex;
+
+    dn.updateStorage(storage);
+
+    // add all blocks to remove list
+    for(Iterator<BlockInfo> it = dn.getBlockIterator(storage.getStorageID());
+        it.hasNext(); ) {
+      toRemove.add(it.next());
+    }
 
     if (newReport == null)
       newReport = new BlockListAsLongs();
@@ -1814,20 +1823,10 @@ public class BlockManager {
     while(itBR.hasNext()) {
       Block iblk = itBR.next();
       ReplicaState iState = itBR.getCurrentReplicaState();
-      BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
-                                  toAdd, toInvalidate, toCorrupt, toUC);
-      // move block to the head of the list
-      if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
-        headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex);
-      }
+      BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
+          iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
+      toRemove.remove(storedBlock);
     }
-    // collect blocks that have not been reported
-    // all of them are next to the delimiter
-    Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
-        delimiter.getNext(0), dn);
-    while(it.hasNext())
-      toRemove.add(it.next());
-    dn.removeBlock(delimiter);
   }
 
   /**
@@ -1861,7 +1860,8 @@ public class BlockManager {
    * @return the up-to-date stored block, if it should be kept.
    *         Otherwise, null.
    */
-  private BlockInfo processReportedBlock(final DatanodeDescriptor dn, 
+  private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
+      final String storageID,
       final Block block, final ReplicaState reportedState, 
       final Collection<BlockInfo> toAdd, 
       final Collection<Block> toInvalidate, 
@@ -1876,7 +1876,7 @@ public class BlockManager {
   
     if (shouldPostponeBlocksFromFuture &&
         namesystem.isGenStampInFuture(block)) {
-      queueReportedBlock(dn, block, reportedState,
+      queueReportedBlock(dn, storageID, block, reportedState,
           QUEUE_REASON_FUTURE_GENSTAMP);
       return null;
     }
@@ -1911,7 +1911,7 @@ assert storedBlock.findDatanode(dn) < 0 
         // If the block is an out-of-date generation stamp or state,
         // but we're the standby, we shouldn't treat it as corrupt,
         // but instead just queue it for later processing.
-        queueReportedBlock(dn, storedBlock, reportedState,
+        queueReportedBlock(dn, storageID, storedBlock, reportedState,
             QUEUE_REASON_CORRUPT_STATE);
       } else {
         toCorrupt.add(c);
@@ -1938,7 +1938,7 @@ assert storedBlock.findDatanode(dn) < 0 
    * standby node. @see PendingDataNodeMessages.
    * @param reason a textual reason to report in the debug logs
    */
-  private void queueReportedBlock(DatanodeDescriptor dn, Block block,
+  private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
       ReplicaState reportedState, String reason) {
     assert shouldPostponeBlocksFromFuture;
     
@@ -1948,7 +1948,7 @@ assert storedBlock.findDatanode(dn) < 0 
           " from datanode " + dn + " for later processing " +
           "because " + reason + ".");
     }
-    pendingDNMessages.enqueueReportedBlock(dn, block, reportedState);
+    pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState);
   }
 
   /**
@@ -1971,8 +1971,8 @@ assert storedBlock.findDatanode(dn) < 0 
       if (LOG.isDebugEnabled()) {
         LOG.debug("Processing previouly queued message " + rbi);
       }
-      processAndHandleReportedBlock(
-          rbi.getNode(), rbi.getBlock(), rbi.getReportedState(), null);
+      processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), 
+          rbi.getBlock(), rbi.getReportedState(), null);
     }
   }
   
@@ -2090,18 +2090,18 @@ assert storedBlock.findDatanode(dn) < 0 
   
   void addStoredBlockUnderConstruction(
       BlockInfoUnderConstruction block, 
-      DatanodeDescriptor node, 
+      DatanodeDescriptor node, String storageID, 
       ReplicaState reportedState) 
   throws IOException {
     block.addReplicaIfNotPresent(node, block, reportedState);
     if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
-      addStoredBlock(block, node, null, true);
+      addStoredBlock(block, node, storageID, null, true);
     }
   }
   
   /**
    * Faster version of
-   * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)}
+   * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)}
    * , intended for use with initial block report at startup. If not in startup
    * safe mode, will call standard addStoredBlock(). Assumes this method is
    * called "immediately" so there is no need to refresh the storedBlock from
@@ -2112,17 +2112,17 @@ assert storedBlock.findDatanode(dn) < 0 
    * @throws IOException
    */
   private void addStoredBlockImmediate(BlockInfo storedBlock,
-                               DatanodeDescriptor node)
+      DatanodeDescriptor node, String storageID)
   throws IOException {
     assert (storedBlock != null && namesystem.hasWriteLock());
     if (!namesystem.isInStartupSafeMode() 
         || namesystem.isPopulatingReplQueues()) {
-      addStoredBlock(storedBlock, node, null, false);
+      addStoredBlock(storedBlock, node, storageID, null, false);
       return;
     }
 
     // just add it
-    node.addBlock(storedBlock);
+    node.addBlock(storageID, storedBlock);
 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
@@ -2145,6 +2145,7 @@ assert storedBlock.findDatanode(dn) < 0 
    */
   private Block addStoredBlock(final BlockInfo block,
                                DatanodeDescriptor node,
+                               String storageID,
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
   throws IOException {
@@ -2170,7 +2171,7 @@ assert storedBlock.findDatanode(dn) < 0 
     assert bc != null : "Block must belong to a file";
 
     // add block to the datanode
-    boolean added = node.addBlock(storedBlock);
+    boolean added = node.addBlock(storageID, storedBlock);
 
     int curReplicaDelta;
     if (added) {
@@ -2614,7 +2615,7 @@ assert storedBlock.findDatanode(dn) < 0 
    * The given node is reporting that it received a certain block.
    */
   @VisibleForTesting
-  void addBlock(DatanodeDescriptor node, Block block, String delHint)
+  void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
       throws IOException {
     // decrement number of blocks scheduled to this datanode.
     // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with 
@@ -2635,11 +2636,12 @@ assert storedBlock.findDatanode(dn) < 0 
     // Modify the blocks->datanode map and node's map.
     //
     pendingReplications.decrement(block, node);
-    processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
+    processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
         delHintNode);
   }
   
-  private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block,
+  private void processAndHandleReportedBlock(DatanodeDescriptor node,
+      String storageID, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
     // blockReceived reports a finalized block
@@ -2647,7 +2649,7 @@ assert storedBlock.findDatanode(dn) < 0 
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    processReportedBlock(node, block, reportedState,
+    processReportedBlock(node, storageID, block, reportedState,
                               toAdd, toInvalidate, toCorrupt, toUC);
     // the block is only in one of the to-do lists
     // if it is in none then data-node already has it
@@ -2655,11 +2657,11 @@ assert storedBlock.findDatanode(dn) < 0 
       : "The block should be only in one of the lists.";
 
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+      addStoredBlockUnderConstruction(b.storedBlock, node, storageID, b.reportedState);
     }
     long numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+      addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -2673,7 +2675,7 @@ assert storedBlock.findDatanode(dn) < 0 
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node);
+      markBlockAsCorrupt(b, node, storageID);
     }
   }
 
@@ -2685,7 +2687,7 @@ assert storedBlock.findDatanode(dn) < 0 
    * This method must be called with FSNamesystem lock held.
    */
   public void processIncrementalBlockReport(final DatanodeID nodeID,
-      final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
+      final String poolId, final StorageReceivedDeletedBlocks srdb)
       throws IOException {
     assert namesystem.hasWriteLock();
     int received = 0;
@@ -2701,19 +2703,19 @@ assert storedBlock.findDatanode(dn) < 0 
           "Got incremental block report from unregistered or dead node");
     }
 
-    for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
+    for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
       switch (rdbi.getStatus()) {
       case DELETED_BLOCK:
         removeStoredBlock(rdbi.getBlock(), node);
         deleted++;
         break;
       case RECEIVED_BLOCK:
-        addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
+        addBlock(node, srdb.getStorageID(), rdbi.getBlock(), rdbi.getDelHints());
         received++;
         break;
       case RECEIVING_BLOCK:
         receiving++;
-        processAndHandleReportedBlock(node, rdbi.getBlock(),
+        processAndHandleReportedBlock(node, srdb.getStorageID(), rdbi.getBlock(),
             ReplicaState.RBW, null);
         break;
       default:

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1518087&r1=1518086&r2=1518087&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Wed Aug 28 06:30:15 2013
@@ -18,15 +18,20 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.util.Time;
 
@@ -93,8 +98,9 @@ public class DatanodeDescriptor extends 
     }
   }
 
-  private volatile BlockInfo blockList = null;
-  private int numBlocks = 0;
+  private final Map<String, DatanodeStorageInfo> storageMap = 
+      new HashMap<String, DatanodeStorageInfo>();
+
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   public boolean isAlive = false;
@@ -217,39 +223,47 @@ public class DatanodeDescriptor extends 
   }
 
   /**
-   * Add datanode to the block.
-   * Add block to the head of the list of blocks belonging to the data-node.
+   * Add data-node to the block. Add block to the head of the list of blocks
+   * belonging to the data-node.
    */
-  public boolean addBlock(BlockInfo b) {
-    if(!b.addNode(this))
-      return false;
-    // add to the head of the data-node list
-    blockList = b.listInsert(blockList, this);
-    numBlocks++;
-    return true;
+  public boolean addBlock(String storageID, BlockInfo b) {
+    DatanodeStorageInfo s = getStorageInfo(storageID);
+    if (s != null) {
+      return s.addBlock(b);
+    }
+    return false;
   }
-  
+
+  DatanodeStorageInfo getStorageInfo(String storageID) {
+    return storageMap.get(storageID);
+  }
+  public Collection<DatanodeStorageInfo> getStorageInfos() {
+    return storageMap.values();
+  }
+
   /**
-   * Remove block from the list of blocks belonging to the data-node.
-   * Remove datanode from the block.
-   */
-  public boolean removeBlock(BlockInfo b) {
-    blockList = b.listRemove(blockList, this);
-    if ( b.removeNode(this) ) {
-      numBlocks--;
-      return true;
-    } else {
-      return false;
+   * Remove block from the list of blocks belonging to the data-node. Remove
+   * data-node from the block.
+   */
+  boolean removeBlock(BlockInfo b) {
+    int index = b.findStorageInfo(this);
+    DatanodeStorageInfo s = b.getStorageInfo(index);
+    if (s != null) {
+      return s.removeBlock(b);
     }
+    return false;
   }
-
+  
   /**
-   * Move block to the head of the list of blocks belonging to the data-node.
-   * @return the index of the head of the blockList
+   * Remove block from the list of blocks belonging to the data-node. Remove
+   * data-node from the block.
    */
-  int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
-    blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
-    return curIndex;
+  boolean removeBlock(String storageID, BlockInfo b) {
+    DatanodeStorageInfo s = getStorageInfo(storageID);
+    if (s != null) {
+      return s.removeBlock(b);
+    }
+    return false;
   }
 
   /**
@@ -257,7 +271,7 @@ public class DatanodeDescriptor extends 
    * @return the head of the blockList
    */
   protected BlockInfo getHead(){
-    return blockList;
+    return getBlockIterator().next();
   }
 
   /**
@@ -268,9 +282,12 @@ public class DatanodeDescriptor extends 
    * @return the new block
    */
   public BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) {
-    boolean done = removeBlock(oldBlock);
+    int index = oldBlock.findStorageInfo(this);
+    DatanodeStorageInfo s = oldBlock.getStorageInfo(index);
+    boolean done = s.removeBlock(oldBlock);
     assert done : "Old block should belong to the data-node when replacing";
-    done = addBlock(newBlock);
+
+    done = s.addBlock(newBlock);
     assert done : "New block should not belong to the data-node when replacing";
     return newBlock;
   }
@@ -281,7 +298,6 @@ public class DatanodeDescriptor extends 
     setBlockPoolUsed(0);
     setDfsUsed(0);
     setXceiverCount(0);
-    this.blockList = null;
     this.invalidateBlocks.clear();
     this.volumeFailures = 0;
   }
@@ -295,7 +311,12 @@ public class DatanodeDescriptor extends 
   }
 
   public int numBlocks() {
-    return numBlocks;
+    // TODO: synchronization
+    int blocks = 0;
+    for (DatanodeStorageInfo entry : storageMap.values()) {
+      blocks += entry.numBlocks();
+    }
+    return blocks;
   }
 
   /**
@@ -314,38 +335,52 @@ public class DatanodeDescriptor extends 
     rollBlocksScheduled(getLastUpdate());
   }
 
-  /**
-   * Iterates over the list of blocks belonging to the datanode.
-   */
-  public static class BlockIterator implements Iterator<BlockInfo> {
-    private BlockInfo current;
-    private DatanodeDescriptor node;
-      
-    BlockIterator(BlockInfo head, DatanodeDescriptor dn) {
-      this.current = head;
-      this.node = dn;
+  private static class BlockIterator implements Iterator<BlockInfo> {
+    private final int maxIndex;
+    private int index = 0;
+    private List<Iterator<BlockInfo>> iterators = new ArrayList<Iterator<BlockInfo>>();
+    
+    private BlockIterator(final Iterable<DatanodeStorageInfo> storages) {
+      for (DatanodeStorageInfo e : storages) {
+        iterators.add(e.getBlockIterator());
+      }
+      maxIndex = iterators.size() - 1;
+    }
+
+    private BlockIterator(final DatanodeStorageInfo storage) {
+      iterators.add(storage.getBlockIterator());
+      maxIndex = iterators.size() - 1;
     }
 
     @Override
     public boolean hasNext() {
-      return current != null;
+      update();
+      return iterators.get(index).hasNext();
     }
 
     @Override
     public BlockInfo next() {
-      BlockInfo res = current;
-      current = current.getNext(current.findDatanode(node));
-      return res;
+      update();
+      return iterators.get(index).next();
     }
-
+    
     @Override
-    public void remove()  {
-      throw new UnsupportedOperationException("Sorry. can't remove.");
+    public void remove() {
+      throw new UnsupportedOperationException("Remove unsupported.");
+    }
+    
+    private void update() {
+      while(index < maxIndex && !iterators.get(index).hasNext()) {
+        index++;
+      }
     }
   }
 
-  public Iterator<BlockInfo> getBlockIterator() {
-    return new BlockIterator(this.blockList, this);
+  Iterator<BlockInfo> getBlockIterator() {
+    return new BlockIterator(storageMap.values());
+  }
+  Iterator<BlockInfo> getBlockIterator(final String storageID) {
+    return new BlockIterator(storageMap.get(storageID));
   }
   
   /**
@@ -601,4 +636,15 @@ public class DatanodeDescriptor extends 
     }
     return sb.toString();
   }
+
+  DatanodeStorageInfo updateStorage(DatanodeStorage s) {
+    DatanodeStorageInfo storage = getStorageInfo(s.getStorageID());
+    if (storage == null) {
+      storage = new DatanodeStorageInfo(this, s);
+      storageMap.put(s.getStorageID(), storage);
+    } else {
+      storage.setState(s.getState());
+    }
+    return storage;
+  }
 }

Added: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1518087&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (added)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Wed Aug 28 06:30:15 2013
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+
+/**
+ * A Datanode has one or more storages. A storage in the Datanode is represented
+ * by this class.
+ */
+public class DatanodeStorageInfo {
+  /**
+   * Iterates over the list of blocks belonging to the data-node.
+   */
+  static class BlockIterator implements Iterator<BlockInfo> {
+    private BlockInfo current;
+    private DatanodeStorageInfo node;
+
+    BlockIterator(BlockInfo head, DatanodeStorageInfo dn) {
+      this.current = head;
+      this.node = dn;
+    }
+
+    public boolean hasNext() {
+      return current != null;
+    }
+
+    public BlockInfo next() {
+      BlockInfo res = current;
+      current = current.getNext(current.findStorageInfo(node));
+      return res;
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException("Sorry. can't remove.");
+    }
+  }
+
+  private final DatanodeDescriptor dn;
+  private final String storageID;
+  private final StorageType storageType;
+  private State state;
+  private long capacity;
+  private long dfsUsed;
+  private long remaining;
+  private volatile BlockInfo blockList = null;
+  
+  DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) {
+    this.dn = dn;
+    this.storageID = s.getStorageID();
+    this.storageType = s.getStorageType();
+    this.state = s.getState();
+  }
+  
+  public void setUtilization(long capacity, long dfsUsed, long remaining) {
+    this.capacity = capacity;
+    this.dfsUsed = dfsUsed;
+    this.remaining = remaining;
+  }
+  
+  public void setState(State s) {
+    this.state = s;
+    
+    // TODO: if goes to failed state cleanup the block list
+  }
+  
+  public State getState() {
+    return this.state;
+  }
+  
+  public String getStorageID() {
+    return storageID;
+  }
+
+  public long getCapacity() {
+    return capacity;
+  }
+
+  public long getDfsUsed() {
+    return dfsUsed;
+  }
+
+  public long getRemaining() {
+    return remaining;
+  }
+
+  public boolean addBlock(BlockInfo b) {
+    if(!b.addStorage(this))
+      return false;
+    // add to the head of the data-node list
+    blockList = b.listInsert(blockList, this);
+    return true;
+  }
+
+  public boolean removeBlock(BlockInfo b) {
+    blockList = b.listRemove(blockList, this);
+    return b.removeStorage(this);
+  }
+
+  public int numBlocks() {
+    return blockList == null ? 0 : blockList.listCount(this);
+  }
+  
+  Iterator<BlockInfo> getBlockIterator() {
+    return new BlockIterator(this.blockList, this);
+  }
+
+  public void updateState(StorageReport r) {
+    capacity = r.getCapacity();
+    dfsUsed = r.getDfsUsed();
+    remaining = r.getRemaining();
+  }
+
+  public DatanodeDescriptor getDatanodeDescriptor() {
+    return dn;
+  }
+  
+  @Override
+  public String toString() {
+    return "[" + storageType + "]" + storageID + ":" + state;
+  }
+}

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1518087&r1=1518086&r2=1518087&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Wed Aug 28 06:30:15 2013
@@ -42,11 +42,13 @@ class PendingDataNodeMessages {
   static class ReportedBlockInfo {
     private final Block block;
     private final DatanodeDescriptor dn;
+    private final String storageID;
     private final ReplicaState reportedState;
 
-    ReportedBlockInfo(DatanodeDescriptor dn, Block block,
+    ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block,
         ReplicaState reportedState) {
       this.dn = dn;
+      this.storageID = storageID;
       this.block = block;
       this.reportedState = reportedState;
     }
@@ -58,6 +60,10 @@ class PendingDataNodeMessages {
     DatanodeDescriptor getNode() {
       return dn;
     }
+    
+    String getStorageID() {
+      return storageID;
+    }
 
     ReplicaState getReportedState() {
       return reportedState;
@@ -70,11 +76,11 @@ class PendingDataNodeMessages {
     }
   }
   
-  void enqueueReportedBlock(DatanodeDescriptor dn, Block block,
+  void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
       ReplicaState reportedState) {
     block = new Block(block);
     getBlockQueue(block).add(
-        new ReportedBlockInfo(dn, block, reportedState));
+        new ReportedBlockInfo(dn, storageID, block, reportedState));
     count++;
   }
   

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1518087&r1=1518086&r2=1518087&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Aug 28 06:30:15 2013
@@ -162,7 +162,13 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.*;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
+import org.apache.hadoop.hdfs.server.blockmanagement.OutOfV1GenerationStampsException;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
@@ -174,14 +180,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
@@ -193,6 +192,12 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -201,7 +206,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RetryCache;
@@ -3785,7 +3790,7 @@ public class FSNamesystem implements Nam
           // Otherwise fsck will report these blocks as MISSING, especially if the
           // blocksReceived from Datanodes take a long time to arrive.
           for (int i = 0; i < descriptors.length; i++) {
-            descriptors[i].addBlock(storedBlock);
+            descriptors[i].addBlock(newtargetstorages[i], storedBlock);
           }
         }
         // add pipeline locations into the INodeUnderConstruction
@@ -5088,11 +5093,11 @@ public class FSNamesystem implements Nam
   }
 
   public void processIncrementalBlockReport(final DatanodeID nodeID,
-      final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
+      final String poolId, final StorageReceivedDeletedBlocks srdb)
       throws IOException {
     writeLock();
     try {
-      blockManager.processIncrementalBlockReport(nodeID, poolId, blockInfos);
+      blockManager.processIncrementalBlockReport(nodeID, poolId, srdb);
     } finally {
       writeUnlock();
     }
@@ -5578,8 +5583,8 @@ public class FSNamesystem implements Nam
         ExtendedBlock blk = blocks[i].getBlock();
         DatanodeInfo[] nodes = blocks[i].getLocations();
         for (int j = 0; j < nodes.length; j++) {
-          DatanodeInfo dn = nodes[j];
-          blockManager.findAndMarkBlockAsCorrupt(blk, dn,
+          //TODO: add "storageID to LocatedBlock
+          blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j], "STORAGE_ID", 
               "client machine reported it");
         }
       }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1518087&r1=1518086&r2=1518087&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Wed Aug 28 06:30:15 2013
@@ -42,8 +42,8 @@ import org.apache.hadoop.fs.FileAlreadyE
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -59,21 +59,21 @@ import org.apache.hadoop.hdfs.HDFSPolicy
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
@@ -89,9 +89,10 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
@@ -943,14 +944,18 @@ class NameNodeRpcServer implements Namen
   public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
       String poolId, StorageBlockReport[] reports) throws IOException {
     verifyRequest(nodeReg);
-    BlockListAsLongs blist = new BlockListAsLongs(reports[0].getBlocks());
     if(blockStateChangeLog.isDebugEnabled()) {
       blockStateChangeLog.debug("*BLOCK* NameNode.blockReport: "
-           + "from " + nodeReg + " " + blist.getNumberOfBlocks()
-           + " blocks");
+           + "from " + nodeReg + ", reports.length=" + reports.length);
+    }
+    final BlockManager bm = namesystem.getBlockManager(); 
+    for(StorageBlockReport r : reports) {
+      final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
+      bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
     }
 
-    namesystem.getBlockManager().processReport(nodeReg, poolId, blist);
+    DatanodeDescriptor datanode = bm.getDatanodeManager().getDatanode(nodeReg);
+    datanode.receivedBlockReport();
     if (nn.getFSImage().isUpgradeFinalized() && !nn.isStandbyState())
       return new FinalizeCommand(poolId);
     return null;
@@ -965,8 +970,9 @@ class NameNodeRpcServer implements Namen
           +"from "+nodeReg+" "+receivedAndDeletedBlocks.length
           +" blocks.");
     }
-    namesystem.processIncrementalBlockReport(
-        nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks());
+    for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
+      namesystem.processIncrementalBlockReport(nodeReg, poolId, r);
+    }
   }
 
   @Override // DatanodeProtocol

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java?rev=1518087&r1=1518086&r2=1518087&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java Wed Aug 28 06:30:15 2013
@@ -157,7 +157,7 @@ public class TestFileCorruption {
       ns.writeLock();
       try {
         cluster.getNamesystem().getBlockManager().findAndMarkBlockAsCorrupt(
-            blk, new DatanodeInfo(dnR), "TEST");
+            blk, new DatanodeInfo(dnR), "TEST", "STORAGE_ID");
       } finally {
         ns.writeUnlock();
       }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1518087&r1=1518086&r2=1518087&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Wed Aug 28 06:30:15 2013
@@ -124,7 +124,7 @@ public class TestBlockManager {
   
   private void doBasicTest(int testIndex) {
     List<DatanodeDescriptor> origNodes = getNodes(0, 1);
-    BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes);
+    BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
 
     DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
     assertEquals(2, pipeline.length);
@@ -307,7 +307,7 @@ public class TestBlockManager {
   private void doTestSufficientlyReplBlocksUsesNewRack(int testIndex) {
     // Originally on only nodes in rack A.
     List<DatanodeDescriptor> origNodes = rackA;
-    BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes);
+    BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
     DatanodeDescriptor pipeline[] = scheduleSingleReplication(blockInfo);
     
     assertEquals(2, pipeline.length); // single new copy
@@ -340,7 +340,7 @@ public class TestBlockManager {
       List<DatanodeDescriptor> origNodes)
       throws Exception {
     assertEquals(0, bm.numOfUnderReplicatedBlocks());
-    addBlockOnNodes((long)testIndex, origNodes);
+    addBlockOnNodes(testIndex, origNodes);
     bm.processMisReplicatedBlocks();
     assertEquals(0, bm.numOfUnderReplicatedBlocks());
   }
@@ -353,7 +353,7 @@ public class TestBlockManager {
   private void fulfillPipeline(BlockInfo blockInfo,
       DatanodeDescriptor[] pipeline) throws IOException {
     for (int i = 1; i < pipeline.length; i++) {
-      bm.addBlock(pipeline[i], blockInfo, null);
+      bm.addBlock(pipeline[i], "STORAGE_ID", blockInfo, null);
     }
   }
 
@@ -362,7 +362,9 @@ public class TestBlockManager {
     BlockInfo blockInfo = new BlockInfo(block, 3);
 
     for (DatanodeDescriptor dn : nodes) {
-      blockInfo.addNode(dn);
+      for (DatanodeStorageInfo storage : dn.getStorageInfos()) {
+        blockInfo.addStorage(storage);
+      }
     }
     return blockInfo;
   }
@@ -508,12 +510,12 @@ public class TestBlockManager {
     assertTrue(node.isFirstBlockReport());
     // send block report, should be processed
     reset(node);
-    bm.processReport(node, "pool", new BlockListAsLongs(null, null));
+    bm.processReport(node, null, "pool", new BlockListAsLongs(null, null));
     verify(node).receivedBlockReport();
     assertFalse(node.isFirstBlockReport());
     // send block report again, should NOT be processed
     reset(node);
-    bm.processReport(node, "pool", new BlockListAsLongs(null, null));
+    bm.processReport(node, null, "pool", new BlockListAsLongs(null, null));
     verify(node, never()).receivedBlockReport();
     assertFalse(node.isFirstBlockReport());
 
@@ -525,7 +527,7 @@ public class TestBlockManager {
     assertTrue(node.isFirstBlockReport()); // ready for report again
     // send block report, should be processed after restart
     reset(node);
-    bm.processReport(node, "pool", new BlockListAsLongs(null, null));
+    bm.processReport(node, null, "pool", new BlockListAsLongs(null, null));
     verify(node).receivedBlockReport();
     assertFalse(node.isFirstBlockReport());
   }
@@ -550,7 +552,7 @@ public class TestBlockManager {
     // send block report while pretending to already have blocks
     reset(node);
     doReturn(1).when(node).numBlocks();
-    bm.processReport(node, "pool", new BlockListAsLongs(null, null));
+    bm.processReport(node, null, "pool", new BlockListAsLongs(null, null));
     verify(node).receivedBlockReport();
     assertFalse(node.isFirstBlockReport());
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java?rev=1518087&r1=1518086&r2=1518087&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java Wed Aug 28 06:30:15 2013
@@ -59,17 +59,18 @@ public class TestDatanodeDescriptor {
     assertEquals(0, dd.numBlocks());
     BlockInfo blk = new BlockInfo(new Block(1L), 1);
     BlockInfo blk1 = new BlockInfo(new Block(2L), 2);
+    final String storageID = "STORAGE_ID";
     // add first block
-    assertTrue(dd.addBlock(blk));
+    assertTrue(dd.addBlock(storageID, blk));
     assertEquals(1, dd.numBlocks());
     // remove a non-existent block
     assertFalse(dd.removeBlock(blk1));
     assertEquals(1, dd.numBlocks());
     // add an existent block
-    assertFalse(dd.addBlock(blk));
+    assertFalse(dd.addBlock(storageID, blk));
     assertEquals(1, dd.numBlocks());
     // add second block
-    assertTrue(dd.addBlock(blk1));
+    assertTrue(dd.addBlock(storageID, blk1));
     assertEquals(2, dd.numBlocks());
     // remove first block
     assertTrue(dd.removeBlock(blk));

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java?rev=1518087&r1=1518086&r2=1518087&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java Wed Aug 28 06:30:15 2013
@@ -43,8 +43,8 @@ public class TestPendingDataNodeMessages
   @Test
   public void testQueues() {
     DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor();
-    msgs.enqueueReportedBlock(fakeDN, block1Gs1, ReplicaState.FINALIZED);
-    msgs.enqueueReportedBlock(fakeDN, block1Gs2, ReplicaState.FINALIZED);
+    msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs1, ReplicaState.FINALIZED);
+    msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs2, ReplicaState.FINALIZED);
 
     assertEquals(2, msgs.count());
     

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java?rev=1518087&r1=1518086&r2=1518087&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java Wed Aug 28 06:30:15 2013
@@ -291,9 +291,9 @@ public class TestPendingReplication {
       cluster.getNamesystem().writeLock();
       try {
         bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
-            "TEST");
+            "STORAGE_ID", "TEST");
         bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[1],
-            "TEST");
+            "STORAGE_ID", "TEST");
       } finally {
         cluster.getNamesystem().writeUnlock();
       }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1518087&r1=1518086&r2=1518087&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Wed Aug 28 06:30:15 2013
@@ -1096,7 +1096,7 @@ public class TestReplicationPolicy {
     // Adding this block will increase its current replication, and that will
     // remove it from the queue.
     bm.addStoredBlockUnderConstruction(info,
-        TestReplicationPolicy.dataNodes[0], ReplicaState.FINALIZED);
+        TestReplicationPolicy.dataNodes[0], "STORAGE", ReplicaState.FINALIZED);
 
     // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
     // from QUEUE_VERY_UNDER_REPLICATED.

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java?rev=1518087&r1=1518086&r2=1518087&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java Wed Aug 28 06:30:15 2013
@@ -235,7 +235,7 @@ public class TestNameNodeMetrics {
     cluster.getNamesystem().writeLock();
     try {
       bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
-          "TEST");
+          "STORAGE_ID", "TEST");
     } finally {
       cluster.getNamesystem().writeUnlock();
     }
@@ -286,7 +286,7 @@ public class TestNameNodeMetrics {
     cluster.getNamesystem().writeLock();
     try {
       bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
-          "TEST");
+          "STORAGE_ID", "TEST");
     } finally {
       cluster.getNamesystem().writeUnlock();
     }