You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2014/08/01 18:58:07 UTC

svn commit: r1615169 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/server/blockmanag...

Author: arp
Date: Fri Aug  1 16:58:06 2014
New Revision: 1615169

URL: http://svn.apache.org/r1615169
Log:
HDFS-6794. Update BlockManager methods to use DatanodeStorageInfo where possible. (Arpit Agarwal)

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug  1 16:58:06 2014
@@ -343,6 +343,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-6796. Improve the argument check during balancer command line parsing.
     (Benoy Antony via szetszwo)
 
+    HDFS-6794. Update BlockManager methods to use DatanodeStorageInfo
+    where possible (Arpit Agarwal)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Aug  1 16:58:06 2014
@@ -1079,6 +1079,7 @@ public class BlockManager {
    * Mark the block belonging to datanode as corrupt
    * @param blk Block to be marked as corrupt
    * @param dn Datanode which holds the corrupt replica
+   * @param storageID if known, null otherwise.
    * @param reason a textual reason why the block should be marked corrupt,
    * for logging purposes
    */
@@ -1095,19 +1096,29 @@ public class BlockManager {
           + blk + " not found");
       return;
     }
-    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
-        blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
-        dn, storageID);
-  }
 
-  private void markBlockAsCorrupt(BlockToMarkCorrupt b,
-      DatanodeInfo dn, String storageID) throws IOException {
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
-      throw new IOException("Cannot mark " + b
+      throw new IOException("Cannot mark " + blk
           + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
           + ") does not exist");
     }
+    
+    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+            blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
+        storageID == null ? null : node.getStorageInfo(storageID),
+        node);
+  }
+
+  /**
+   * 
+   * @param b
+   * @param storageInfo storage that contains the block, if known. null otherwise.
+   * @throws IOException
+   */
+  private void markBlockAsCorrupt(BlockToMarkCorrupt b,
+      DatanodeStorageInfo storageInfo,
+      DatanodeDescriptor node) throws IOException {
 
     BlockCollection bc = b.corrupted.getBlockCollection();
     if (bc == null) {
@@ -1118,7 +1129,9 @@ public class BlockManager {
     } 
 
     // Add replica to the data-node if it is not already there
-    node.addBlock(storageID, b.stored);
+    if (storageInfo != null) {
+      storageInfo.addBlock(b.stored);
+    }
 
     // Add this replica to corruptReplicas Map
     corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
@@ -1457,7 +1470,7 @@ public class BlockManager {
    * @throws IOException
    *           if the number of targets < minimum replication.
    * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
-   *      List, boolean, Set, long)
+   *      List, boolean, Set, long, StorageType)
    */
   public DatanodeStorageInfo[] chooseTarget(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
@@ -1694,7 +1707,7 @@ public class BlockManager {
    * @throws IOException
    */
   public boolean processReport(final DatanodeID nodeID,
-      final DatanodeStorage storage, final String poolId,
+      final DatanodeStorage storage,
       final BlockListAsLongs newReport) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.now(); //after acquiring write lock
@@ -1726,9 +1739,9 @@ public class BlockManager {
       if (storageInfo.numBlocks() == 0) {
         // The first block report can be processed a lot more efficiently than
         // ordinary block reports.  This shortens restart times.
-        processFirstBlockReport(node, storage.getStorageID(), newReport);
+        processFirstBlockReport(storageInfo, newReport);
       } else {
-        processReport(node, storage, newReport);
+        processReport(storageInfo, newReport);
       }
       
       // Now that we have an up-to-date block report, we know that any
@@ -1790,9 +1803,8 @@ public class BlockManager {
     }
   }
   
-  private void processReport(final DatanodeDescriptor node,
-      final DatanodeStorage storage,
-      final BlockListAsLongs report) throws IOException {
+  private void processReport(final DatanodeStorageInfo storageInfo,
+                             final BlockListAsLongs report) throws IOException {
     // Normal case:
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
@@ -1802,19 +1814,20 @@ public class BlockManager {
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    reportDiff(node, storage, report,
+    reportDiff(storageInfo, report,
         toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
+   
+    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     // Process the blocks on each queue
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b, node, storage.getStorageID());
+      addStoredBlockUnderConstruction(b, storageInfo);
     }
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
     }
     int numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog);
+      addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -1828,7 +1841,7 @@ public class BlockManager {
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node, storage.getStorageID());
+      markBlockAsCorrupt(b, storageInfo, node);
     }
   }
 
@@ -1839,16 +1852,16 @@ public class BlockManager {
    * a toRemove list (since there won't be any).  It also silently discards 
    * any invalid blocks, thereby deferring their processing until 
    * the next block report.
-   * @param node - DatanodeDescriptor of the node that sent the report
+   * @param storageInfo - DatanodeStorageInfo that sent the report
    * @param report - the initial block report, to be processed
    * @throws IOException 
    */
-  private void processFirstBlockReport(final DatanodeDescriptor node,
-      final String storageID,
+  private void processFirstBlockReport(
+      final DatanodeStorageInfo storageInfo,
       final BlockListAsLongs report) throws IOException {
     if (report == null) return;
     assert (namesystem.hasWriteLock());
-    assert (node.getStorageInfo(storageID).numBlocks() == 0);
+    assert (storageInfo.numBlocks() == 0);
     BlockReportIterator itBR = report.getBlockReportIterator();
 
     while(itBR.hasNext()) {
@@ -1857,7 +1870,7 @@ public class BlockManager {
       
       if (shouldPostponeBlocksFromFuture &&
           namesystem.isGenStampInFuture(iblk)) {
-        queueReportedBlock(node, storageID, iblk, reportedState,
+        queueReportedBlock(storageInfo, iblk, reportedState,
             QUEUE_REASON_FUTURE_GENSTAMP);
         continue;
       }
@@ -1869,15 +1882,16 @@ public class BlockManager {
       // If block is corrupt, mark it and continue to next block.
       BlockUCState ucState = storedBlock.getBlockUCState();
       BlockToMarkCorrupt c = checkReplicaCorrupt(
-          iblk, reportedState, storedBlock, ucState, node);
+          iblk, reportedState, storedBlock, ucState,
+          storageInfo.getDatanodeDescriptor());
       if (c != null) {
         if (shouldPostponeBlocksFromFuture) {
           // In the Standby, we may receive a block report for a file that we
           // just have an out-of-date gen-stamp or state for, for example.
-          queueReportedBlock(node, storageID, iblk, reportedState,
+          queueReportedBlock(storageInfo, iblk, reportedState,
               QUEUE_REASON_CORRUPT_STATE);
         } else {
-          markBlockAsCorrupt(c, node, storageID);
+          markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor());
         }
         continue;
       }
@@ -1885,7 +1899,7 @@ public class BlockManager {
       // If block is under construction, add this replica to its list
       if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
         ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
-            node.getStorageInfo(storageID), iblk, reportedState);
+            storageInfo, iblk, reportedState);
         // OpenFileBlocks only inside snapshots also will be added to safemode
         // threshold. So we need to update such blocks to safemode
         // refer HDFS-5283
@@ -1898,12 +1912,12 @@ public class BlockManager {
       }      
       //add replica if appropriate
       if (reportedState == ReplicaState.FINALIZED) {
-        addStoredBlockImmediate(storedBlock, node, storageID);
+        addStoredBlockImmediate(storedBlock, storageInfo);
       }
     }
   }
 
-  private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, 
+  private void reportDiff(DatanodeStorageInfo storageInfo, 
       BlockListAsLongs newReport, 
       Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
       Collection<Block> toRemove,           // remove from DatanodeDescriptor
@@ -1911,8 +1925,6 @@ public class BlockManager {
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
 
-    final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID());
-
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -1929,7 +1941,7 @@ public class BlockManager {
     while(itBR.hasNext()) {
       Block iblk = itBR.next();
       ReplicaState iState = itBR.getCurrentReplicaState();
-      BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
+      BlockInfo storedBlock = processReportedBlock(storageInfo,
           iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
 
       // move block to the head of the list
@@ -1966,7 +1978,7 @@ public class BlockManager {
    * BlockInfoUnderConstruction's list of replicas.</li>
    * </ol>
    * 
-   * @param dn descriptor for the datanode that made the report
+   * @param storageInfo DatanodeStorageInfo that sent the report.
    * @param block reported block replica
    * @param reportedState reported replica state
    * @param toAdd add to DatanodeDescriptor
@@ -1978,14 +1990,16 @@ public class BlockManager {
    * @return the up-to-date stored block, if it should be kept.
    *         Otherwise, null.
    */
-  private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
-      final String storageID,
+  private BlockInfo processReportedBlock(
+      final DatanodeStorageInfo storageInfo,
       final Block block, final ReplicaState reportedState, 
       final Collection<BlockInfo> toAdd, 
       final Collection<Block> toInvalidate, 
       final Collection<BlockToMarkCorrupt> toCorrupt,
       final Collection<StatefulBlockInfo> toUC) {
     
+    DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
+
     if(LOG.isDebugEnabled()) {
       LOG.debug("Reported block " + block
           + " on " + dn + " size " + block.getNumBytes()
@@ -1994,7 +2008,7 @@ public class BlockManager {
   
     if (shouldPostponeBlocksFromFuture &&
         namesystem.isGenStampInFuture(block)) {
-      queueReportedBlock(dn, storageID, block, reportedState,
+      queueReportedBlock(storageInfo, block, reportedState,
           QUEUE_REASON_FUTURE_GENSTAMP);
       return null;
     }
@@ -2034,7 +2048,7 @@ public class BlockManager {
         // TODO: Pretty confident this should be s/storedBlock/block below,
         // since we should be postponing the info of the reported block, not
         // the stored block. See HDFS-6289 for more context.
-        queueReportedBlock(dn, storageID, storedBlock, reportedState,
+        queueReportedBlock(storageInfo, storedBlock, reportedState,
             QUEUE_REASON_CORRUPT_STATE);
       } else {
         toCorrupt.add(c);
@@ -2063,17 +2077,17 @@ public class BlockManager {
    * standby node. @see PendingDataNodeMessages.
    * @param reason a textual reason to report in the debug logs
    */
-  private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
+  private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, String reason) {
     assert shouldPostponeBlocksFromFuture;
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Queueing reported block " + block +
           " in state " + reportedState + 
-          " from datanode " + dn + " for later processing " +
-          "because " + reason + ".");
+          " from datanode " + storageInfo.getDatanodeDescriptor() +
+          " for later processing because " + reason + ".");
     }
-    pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState);
+    pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState);
   }
 
   /**
@@ -2096,7 +2110,7 @@ public class BlockManager {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Processing previouly queued message " + rbi);
       }
-      processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), 
+      processAndHandleReportedBlock(rbi.getStorageInfo(), 
           rbi.getBlock(), rbi.getReportedState(), null);
     }
   }
@@ -2216,19 +2230,20 @@ public class BlockManager {
   }
 
   void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
-      DatanodeDescriptor node, String storageID) throws IOException {
+      DatanodeStorageInfo storageInfo) throws IOException {
     BlockInfoUnderConstruction block = ucBlock.storedBlock;
-    block.addReplicaIfNotPresent(node.getStorageInfo(storageID),
-        ucBlock.reportedBlock, ucBlock.reportedState);
+    block.addReplicaIfNotPresent(
+        storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
 
-    if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
-      addStoredBlock(block, node, storageID, null, true);
+    if (ucBlock.reportedState == ReplicaState.FINALIZED &&
+        block.findDatanode(storageInfo.getDatanodeDescriptor()) < 0) {
+      addStoredBlock(block, storageInfo, null, true);
     }
   } 
 
   /**
    * Faster version of
-   * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)}
+   * {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, DatanodeDescriptor, boolean)}
    * , intended for use with initial block report at startup. If not in startup
    * safe mode, will call standard addStoredBlock(). Assumes this method is
    * called "immediately" so there is no need to refresh the storedBlock from
@@ -2239,17 +2254,17 @@ public class BlockManager {
    * @throws IOException
    */
   private void addStoredBlockImmediate(BlockInfo storedBlock,
-      DatanodeDescriptor node, String storageID)
+      DatanodeStorageInfo storageInfo)
   throws IOException {
     assert (storedBlock != null && namesystem.hasWriteLock());
     if (!namesystem.isInStartupSafeMode() 
         || namesystem.isPopulatingReplQueues()) {
-      addStoredBlock(storedBlock, node, storageID, null, false);
+      addStoredBlock(storedBlock, storageInfo, null, false);
       return;
     }
 
     // just add it
-    node.addBlock(storageID, storedBlock);
+    storageInfo.addBlock(storedBlock);
 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
@@ -2271,13 +2286,13 @@ public class BlockManager {
    * @return the block that is stored in blockMap.
    */
   private Block addStoredBlock(final BlockInfo block,
-                               DatanodeDescriptor node,
-                               String storageID,
+                               DatanodeStorageInfo storageInfo,
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
   throws IOException {
     assert block != null && namesystem.hasWriteLock();
     BlockInfo storedBlock;
+    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     if (block instanceof BlockInfoUnderConstruction) {
       //refresh our copy in case the block got completed in another thread
       storedBlock = blocksMap.getStoredBlock(block);
@@ -2297,7 +2312,7 @@ public class BlockManager {
     assert bc != null : "Block must belong to a file";
 
     // add block to the datanode
-    boolean added = node.addBlock(storageID, storedBlock);
+    boolean added = storageInfo.addBlock(storedBlock);
 
     int curReplicaDelta;
     if (added) {
@@ -2843,8 +2858,9 @@ public class BlockManager {
    * The given node is reporting that it received a certain block.
    */
   @VisibleForTesting
-  void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
+  void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
       throws IOException {
+    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     // Decrement number of blocks scheduled to this datanode.
     // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with 
     // RECEIVED_BLOCK), we currently also decrease the approximate number. 
@@ -2864,12 +2880,12 @@ public class BlockManager {
     // Modify the blocks->datanode map and node's map.
     //
     pendingReplications.decrement(block, node);
-    processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
+    processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
         delHintNode);
   }
   
-  private void processAndHandleReportedBlock(DatanodeDescriptor node,
-      String storageID, Block block,
+  private void processAndHandleReportedBlock(
+      DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
     // blockReceived reports a finalized block
@@ -2877,7 +2893,9 @@ public class BlockManager {
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    processReportedBlock(node, storageID, block, reportedState,
+    final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
+
+    processReportedBlock(storageInfo, block, reportedState,
                               toAdd, toInvalidate, toCorrupt, toUC);
     // the block is only in one of the to-do lists
     // if it is in none then data-node already has it
@@ -2885,11 +2903,11 @@ public class BlockManager {
       : "The block should be only in one of the lists.";
 
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b, node, storageID);
+      addStoredBlockUnderConstruction(b, storageInfo);
     }
     long numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+      addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -2903,7 +2921,7 @@ public class BlockManager {
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node, storageID);
+      markBlockAsCorrupt(b, storageInfo, node);
     }
   }
 
@@ -2930,13 +2948,15 @@ public class BlockManager {
           "Got incremental block report from unregistered or dead node");
     }
 
-    if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) {
+    DatanodeStorageInfo storageInfo =
+        node.getStorageInfo(srdb.getStorage().getStorageID());
+    if (storageInfo == null) {
       // The DataNode is reporting an unknown storage. Usually the NN learns
       // about new storages from heartbeats but during NN restart we may
       // receive a block report or incremental report before the heartbeat.
       // We must handle this for protocol compatibility. This issue was
       // uncovered by HDFS-6094.
-      node.updateStorage(srdb.getStorage());
+      storageInfo = node.updateStorage(srdb.getStorage());
     }
 
     for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
@@ -2946,14 +2966,13 @@ public class BlockManager {
         deleted++;
         break;
       case RECEIVED_BLOCK:
-        addBlock(node, srdb.getStorage().getStorageID(),
-            rdbi.getBlock(), rdbi.getDelHints());
+        addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
         received++;
         break;
       case RECEIVING_BLOCK:
         receiving++;
-        processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(),
-            rdbi.getBlock(), ReplicaState.RBW, null);
+        processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
+                                      ReplicaState.RBW, null);
         break;
       default:
         String msg = 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Fri Aug  1 16:58:06 2014
@@ -207,7 +207,7 @@ public class DatanodeStorageInfo {
     return blockPoolUsed;
   }
 
-  boolean addBlock(BlockInfo b) {
+  public boolean addBlock(BlockInfo b) {
     if(!b.addStorage(this))
       return false;
     // add to the head of the data-node list

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Fri Aug  1 16:58:06 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 
 /**
  * In the Standby Node, we can receive messages about blocks
@@ -41,14 +42,12 @@ class PendingDataNodeMessages {
     
   static class ReportedBlockInfo {
     private final Block block;
-    private final DatanodeDescriptor dn;
-    private final String storageID;
+    private final DatanodeStorageInfo storageInfo;
     private final ReplicaState reportedState;
 
-    ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block,
+    ReportedBlockInfo(DatanodeStorageInfo storageInfo, Block block,
         ReplicaState reportedState) {
-      this.dn = dn;
-      this.storageID = storageID;
+      this.storageInfo = storageInfo;
       this.block = block;
       this.reportedState = reportedState;
     }
@@ -57,21 +56,18 @@ class PendingDataNodeMessages {
       return block;
     }
 
-    DatanodeDescriptor getNode() {
-      return dn;
-    }
-    
-    String getStorageID() {
-      return storageID;
-    }
-
     ReplicaState getReportedState() {
       return reportedState;
     }
+    
+    DatanodeStorageInfo getStorageInfo() {
+      return storageInfo;
+    }
 
     @Override
     public String toString() {
-      return "ReportedBlockInfo [block=" + block + ", dn=" + dn
+      return "ReportedBlockInfo [block=" + block + ", dn="
+          + storageInfo.getDatanodeDescriptor()
           + ", reportedState=" + reportedState + "]";
     }
   }
@@ -87,7 +83,7 @@ class PendingDataNodeMessages {
       Queue<ReportedBlockInfo> oldQueue = entry.getValue();
       while (!oldQueue.isEmpty()) {
         ReportedBlockInfo rbi = oldQueue.remove();
-        if (!rbi.getNode().equals(dn)) {
+        if (!rbi.getStorageInfo().getDatanodeDescriptor().equals(dn)) {
           newQueue.add(rbi);
         } else {
           count--;
@@ -97,11 +93,11 @@ class PendingDataNodeMessages {
     }
   }
   
-  void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
+  void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState) {
     block = new Block(block);
     getBlockQueue(block).add(
-        new ReportedBlockInfo(dn, storageID, block, reportedState));
+        new ReportedBlockInfo(storageInfo, block, reportedState));
     count++;
   }
   

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Aug  1 16:58:06 2014
@@ -4355,8 +4355,11 @@ public class FSNamesystem implements Nam
           // Otherwise fsck will report these blocks as MISSING, especially if the
           // blocksReceived from Datanodes take a long time to arrive.
           for (int i = 0; i < trimmedTargets.size(); i++) {
-            trimmedTargets.get(i).addBlock(
-              trimmedStorages.get(i), storedBlock);
+            DatanodeStorageInfo storageInfo =
+                trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
+            if (storageInfo != null) {
+              storageInfo.addBlock(storedBlock);
+            }
           }
         }
 
@@ -5835,7 +5838,7 @@ public class FSNamesystem implements Nam
   }
 
   public void processIncrementalBlockReport(final DatanodeID nodeID,
-      final String poolId, final StorageReceivedDeletedBlocks srdb)
+      final StorageReceivedDeletedBlocks srdb)
       throws IOException {
     writeLock();
     try {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Aug  1 16:58:06 2014
@@ -1065,7 +1065,7 @@ class NameNodeRpcServer implements Namen
       // for the same node and storage, so the value returned by the last
       // call of this loop is the final updated value for noStaleStorage.
       //
-      noStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
+      noStaleStorages = bm.processReport(nodeReg, r.getStorage(), blocks);
       metrics.incrStorageBlockReportOps();
     }
 
@@ -1101,7 +1101,7 @@ class NameNodeRpcServer implements Namen
           +" blocks.");
     }
     for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
-      namesystem.processIncrementalBlockReport(nodeReg, poolId, r);
+      namesystem.processIncrementalBlockReport(nodeReg, r);
     }
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Fri Aug  1 16:58:06 2014
@@ -368,7 +368,7 @@ public class TestBlockManager {
       DatanodeStorageInfo[] pipeline) throws IOException {
     for (int i = 1; i < pipeline.length; i++) {
       DatanodeStorageInfo storage = pipeline[i];
-      bm.addBlock(storage.getDatanodeDescriptor(), storage.getStorageID(), blockInfo, null);
+      bm.addBlock(storage, blockInfo, null);
       blockInfo.addStorage(storage);
     }
   }
@@ -549,12 +549,12 @@ public class TestBlockManager {
     // send block report, should be processed
     reset(node);
     
-    bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", 
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
         new BlockListAsLongs(null, null));
     assertEquals(1, ds.getBlockReportCount());
     // send block report again, should NOT be processed
     reset(node);
-    bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
         new BlockListAsLongs(null, null));
     assertEquals(1, ds.getBlockReportCount());
 
@@ -566,7 +566,7 @@ public class TestBlockManager {
     assertEquals(0, ds.getBlockReportCount()); // ready for report again
     // send block report, should be processed after restart
     reset(node);
-    bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
         new BlockListAsLongs(null, null));
     assertEquals(1, ds.getBlockReportCount());
   }
@@ -595,7 +595,7 @@ public class TestBlockManager {
     // send block report while pretending to already have blocks
     reset(node);
     doReturn(1).when(node).numBlocks();
-    bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
         new BlockListAsLongs(null, null));
     assertEquals(1, ds.getBlockReportCount());
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java Fri Aug  1 16:58:06 2014
@@ -63,16 +63,16 @@ public class TestDatanodeDescriptor {
     assertTrue(storages.length > 0);
     final String storageID = storages[0].getStorageID();
     // add first block
-    assertTrue(dd.addBlock(storageID, blk));
+    assertTrue(storages[0].addBlock(blk));
     assertEquals(1, dd.numBlocks());
     // remove a non-existent block
     assertFalse(dd.removeBlock(blk1));
     assertEquals(1, dd.numBlocks());
     // add an existent block
-    assertFalse(dd.addBlock(storageID, blk));
+    assertFalse(storages[0].addBlock(blk));
     assertEquals(1, dd.numBlocks());
     // add second block
-    assertTrue(dd.addBlock(storageID, blk1));
+    assertTrue(storages[0].addBlock(blk1));
     assertEquals(2, dd.numBlocks());
     // remove first block
     assertTrue(dd.removeBlock(blk));

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java Fri Aug  1 16:58:06 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.junit.Test;
 
 import com.google.common.base.Joiner;
@@ -43,8 +44,10 @@ public class TestPendingDataNodeMessages
   @Test
   public void testQueues() {
     DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor();
-    msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs1, ReplicaState.FINALIZED);
-    msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs2, ReplicaState.FINALIZED);
+    DatanodeStorage storage = new DatanodeStorage("STORAGE_ID");
+    DatanodeStorageInfo storageInfo = new DatanodeStorageInfo(fakeDN, storage);
+    msgs.enqueueReportedBlock(storageInfo, block1Gs1, ReplicaState.FINALIZED);
+    msgs.enqueueReportedBlock(storageInfo, block1Gs2, ReplicaState.FINALIZED);
 
     assertEquals(2, msgs.count());
     

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Fri Aug  1 16:58:06 2014
@@ -82,7 +82,7 @@ public class TestReplicationPolicy {
   private static NameNode namenode;
   private static BlockPlacementPolicy replicator;
   private static final String filename = "/dummyfile.txt";
-  private static DatanodeDescriptor dataNodes[];
+  private static DatanodeDescriptor[] dataNodes;
   private static DatanodeStorageInfo[] storages;
   // The interval for marking a datanode as stale,
   private static final long staleInterval =
@@ -1118,8 +1118,7 @@ public class TestReplicationPolicy {
     // Adding this block will increase its current replication, and that will
     // remove it from the queue.
     bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
-              ReplicaState.FINALIZED), TestReplicationPolicy.dataNodes[0],
-            "STORAGE");
+              ReplicaState.FINALIZED), TestReplicationPolicy.storages[0]);
 
     // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
     // from QUEUE_VERY_UNDER_REPLICATED.