You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cn...@apache.org on 2013/12/13 18:28:18 UTC

svn commit: r1550774 [4/10] - in /hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs: ./ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apach...

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Fri Dec 13 17:28:14 2013
@@ -25,6 +25,8 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.net.NetworkTopology;
@@ -64,81 +66,87 @@ public class BlockPlacementPolicyWithNod
    * @return the chosen node
    */
   @Override
-  protected DatanodeDescriptor chooseLocalNode(Node localMachine,
+  protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
-      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
-        throws NotEnoughReplicasException {
+      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+      StorageType storageType) throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-          blocksize, maxNodesPerRack, results, avoidStaleNodes);
+          blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
 
+    // otherwise try local machine first
     if (localMachine instanceof DatanodeDescriptor) {
       DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine;
-      // otherwise try local machine first
       if (excludedNodes.add(localMachine)) { // was not in the excluded list
-        if (addIfIsGoodTarget(localDataNode, excludedNodes, blocksize,
-            maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
-          return localDataNode;
+        for(DatanodeStorageInfo localStorage : DFSUtil.shuffle(
+            localDataNode.getStorageInfos())) {
+          if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
+              maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) {
+            return localStorage;
+          }
         }
       }
     }
 
     // try a node on local node group
-    DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
+    DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup(
         (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results, avoidStaleNodes);
-    if (chosenNode != null) {
-      return chosenNode;
+        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+    if (chosenStorage != null) {
+      return chosenStorage;
     }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results, avoidStaleNodes);
+        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+  }
+
+  /** @return the node of the second replica */
+  private static DatanodeDescriptor secondNode(Node localMachine,
+      List<DatanodeStorageInfo> results) {
+    // find the second replica
+    for(DatanodeStorageInfo nextStorage : results) {
+      DatanodeDescriptor nextNode = nextStorage.getDatanodeDescriptor();
+      if (nextNode != localMachine) {
+        return nextNode;
+      }
+    }
+    return null;
   }
 
-  
   @Override
-  protected DatanodeDescriptor chooseLocalRack(Node localMachine,
+  protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
-      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
-      throws NotEnoughReplicasException {
+      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+      StorageType storageType) throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results, 
-                          avoidStaleNodes);
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes, storageType);
     }
 
     // choose one from the local rack, but off-nodegroup
     try {
-      return chooseRandom(NetworkTopology.getFirstHalf(
-                              localMachine.getNetworkLocation()),
-                          excludedNodes, blocksize, 
-                          maxNodesPerRack, results, 
-                          avoidStaleNodes);
+      final String scope = NetworkTopology.getFirstHalf(localMachine.getNetworkLocation());
+      return chooseRandom(scope, excludedNodes, blocksize, maxNodesPerRack,
+          results, avoidStaleNodes, storageType);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
-      DatanodeDescriptor newLocal=null;
-      for(DatanodeDescriptor nextNode : results) {
-        if (nextNode != localMachine) {
-          newLocal = nextNode;
-          break;
-        }
-      }
+      final DatanodeDescriptor newLocal = secondNode(localMachine, results);
       if (newLocal != null) {
         try {
           return chooseRandom(
               clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
-              blocksize, maxNodesPerRack, results, avoidStaleNodes);
+              blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
           return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-              maxNodesPerRack, results, avoidStaleNodes);
+              maxNodesPerRack, results, avoidStaleNodes, storageType);
         }
       } else {
         //otherwise randomly choose one from the network
         return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes);
+            maxNodesPerRack, results, avoidStaleNodes, storageType);
       }
     }
   }
@@ -146,8 +154,9 @@ public class BlockPlacementPolicyWithNod
   @Override
   protected void chooseRemoteRack(int numOfReplicas,
       DatanodeDescriptor localMachine, Set<Node> excludedNodes,
-      long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
-      boolean avoidStaleNodes) throws NotEnoughReplicasException {
+      long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> results,
+      boolean avoidStaleNodes, StorageType storageType)
+          throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
 
     final String rackLocation = NetworkTopology.getFirstHalf(
@@ -155,12 +164,12 @@ public class BlockPlacementPolicyWithNod
     try {
       // randomly choose from remote racks
       chooseRandom(numOfReplicas, "~" + rackLocation, excludedNodes, blocksize,
-          maxReplicasPerRack, results, avoidStaleNodes);
+          maxReplicasPerRack, results, avoidStaleNodes, storageType);
     } catch (NotEnoughReplicasException e) {
       // fall back to the local rack
       chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
           rackLocation, excludedNodes, blocksize,
-          maxReplicasPerRack, results, avoidStaleNodes);
+          maxReplicasPerRack, results, avoidStaleNodes, storageType);
     }
   }
 
@@ -170,46 +179,40 @@ public class BlockPlacementPolicyWithNod
    * if still no such node is available, choose a random node in the cluster.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalNodeGroup(
+  private DatanodeStorageInfo chooseLocalNodeGroup(
       NetworkTopologyWithNodeGroup clusterMap, Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
-      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
-      throws NotEnoughReplicasException {
+      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+      StorageType storageType) throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-      blocksize, maxNodesPerRack, results, avoidStaleNodes);
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes, storageType);
     }
 
     // choose one from the local node group
     try {
       return chooseRandom(
           clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
-          excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
+          excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
+          storageType);
     } catch (NotEnoughReplicasException e1) {
-      // find the second replica
-      DatanodeDescriptor newLocal=null;
-      for(DatanodeDescriptor nextNode : results) {
-        if (nextNode != localMachine) {
-          newLocal = nextNode;
-          break;
-        }
-      }
+      final DatanodeDescriptor newLocal = secondNode(localMachine, results);
       if (newLocal != null) {
         try {
           return chooseRandom(
               clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
               excludedNodes, blocksize, maxNodesPerRack, results,
-              avoidStaleNodes);
+              avoidStaleNodes, storageType);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
           return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-              maxNodesPerRack, results, avoidStaleNodes);
+              maxNodesPerRack, results, avoidStaleNodes, storageType);
         }
       } else {
         //otherwise randomly choose one from the network
         return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes);
+            maxNodesPerRack, results, avoidStaleNodes, storageType);
       }
     }
   }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Fri Dec 13 17:28:14 2013
@@ -29,11 +29,11 @@ import org.apache.hadoop.util.LightWeigh
  * the datanodes that store the block.
  */
 class BlocksMap {
-  private static class NodeIterator implements Iterator<DatanodeDescriptor> {
+  private static class StorageIterator implements Iterator<DatanodeStorageInfo> {
     private BlockInfo blockInfo;
     private int nextIdx = 0;
       
-    NodeIterator(BlockInfo blkInfo) {
+    StorageIterator(BlockInfo blkInfo) {
       this.blockInfo = blkInfo;
     }
 
@@ -44,8 +44,8 @@ class BlocksMap {
     }
 
     @Override
-    public DatanodeDescriptor next() {
-      return blockInfo.getDatanode(nextIdx++);
+    public DatanodeStorageInfo next() {
+      return blockInfo.getStorageInfo(nextIdx++);
     }
 
     @Override
@@ -115,18 +115,23 @@ class BlocksMap {
 
   /**
    * Searches for the block in the BlocksMap and 
-   * returns Iterator that iterates through the nodes the block belongs to.
+   * returns {@link Iterable} of the storages the block belongs to.
    */
-  Iterator<DatanodeDescriptor> nodeIterator(Block b) {
-    return nodeIterator(blocks.get(b));
+  Iterable<DatanodeStorageInfo> getStorages(Block b) {
+    return getStorages(blocks.get(b));
   }
 
   /**
    * For a block that has already been retrieved from the BlocksMap
-   * returns Iterator that iterates through the nodes the block belongs to.
+   * returns {@link Iterable} of the storages the block belongs to.
    */
-  Iterator<DatanodeDescriptor> nodeIterator(BlockInfo storedBlock) {
-    return new NodeIterator(storedBlock);
+  Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
+    return new Iterable<DatanodeStorageInfo>() {
+      @Override
+      public Iterator<DatanodeStorageInfo> iterator() {
+        return new StorageIterator(storedBlock);
+      }
+    };
   }
 
   /** counts number of containing nodes. Better than using iterator. */

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Fri Dec 13 17:28:14 2013
@@ -18,23 +18,29 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.Time;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This class extends the DatanodeInfo class with ephemeral information (eg
  * health, capacity, what blocks are associated with the Datanode) that is
@@ -43,6 +49,7 @@ import com.google.common.annotations.Vis
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class DatanodeDescriptor extends DatanodeInfo {
+  public static final Log LOG = LogFactory.getLog(DatanodeDescriptor.class);
   public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
 
   // Stores status of decommissioning.
@@ -54,9 +61,9 @@ public class DatanodeDescriptor extends 
   @InterfaceStability.Evolving
   public static class BlockTargetPair {
     public final Block block;
-    public final DatanodeDescriptor[] targets;    
+    public final DatanodeStorageInfo[] targets;    
 
-    BlockTargetPair(Block block, DatanodeDescriptor[] targets) {
+    BlockTargetPair(Block block, DatanodeStorageInfo[] targets) {
       this.block = block;
       this.targets = targets;
     }
@@ -99,6 +106,9 @@ public class DatanodeDescriptor extends 
     }
   }
 
+  private final Map<String, DatanodeStorageInfo> storageMap = 
+      new HashMap<String, DatanodeStorageInfo>();
+
   /**
    * A list of CachedBlock objects on this datanode.
    */
@@ -164,37 +174,11 @@ public class DatanodeDescriptor extends 
    */
   private long lastCachingDirectiveSentTimeMs;
 
-  /**
-   * Head of the list of blocks on the datanode
-   */
-  private volatile BlockInfo blockList = null;
-  /**
-   * Number of blocks on the datanode
-   */
-  private int numBlocks = 0;
-
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   public boolean isAlive = false;
   public boolean needKeyUpdate = false;
 
-  /**
-   * Set to false on any NN failover, and reset to true
-   * whenever a block report is received.
-   */
-  private boolean heartbeatedSinceFailover = false;
-  
-  /**
-   * At startup or at any failover, the DNs in the cluster may
-   * have pending block deletions from a previous incarnation
-   * of the NameNode. Thus, we consider their block contents
-   * stale until we have received a block report. When a DN
-   * is considered stale, any replicas on it are transitively
-   * considered stale. If any block has at least one stale replica,
-   * then no invalidations will be processed for this block.
-   * See HDFS-1972.
-   */
-  private boolean blockContentsStale = true;
   
   // A system administrator can tune the balancer bandwidth parameter
   // (dfs.balance.bandwidthPerSec) dynamically by calling
@@ -213,7 +197,7 @@ public class DatanodeDescriptor extends 
   private LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
 
   /* Variables for maintaining number of blocks scheduled to be written to
-   * this datanode. This count is approximate and might be slightly bigger
+   * this storage. This count is approximate and might be slightly bigger
    * in case of errors (e.g. datanode does not report if an error occurs
    * while writing the block).
    */
@@ -223,9 +207,6 @@ public class DatanodeDescriptor extends 
   private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
   private int volumeFailures = 0;
   
-  /** Set to false after processing first block report */
-  private boolean firstBlockReport = true;
-  
   /** 
    * When set to true, the node is not in include list and is not allowed
    * to communicate with the namenode
@@ -237,7 +218,8 @@ public class DatanodeDescriptor extends 
    * @param nodeID id of the data node
    */
   public DatanodeDescriptor(DatanodeID nodeID) {
-    this(nodeID, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
+    super(nodeID);
+    updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
   }
 
   /**
@@ -247,104 +229,60 @@ public class DatanodeDescriptor extends 
    */
   public DatanodeDescriptor(DatanodeID nodeID, 
                             String networkLocation) {
-    this(nodeID, networkLocation, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
-  }
-  
-  /**
-   * DatanodeDescriptor constructor
-   * @param nodeID id of the data node
-   * @param capacity capacity of the data node
-   * @param dfsUsed space used by the data node
-   * @param remaining remaining capacity of the data node
-   * @param bpused space used by the block pool corresponding to this namenode
-   * @param cacheCapacity cache capacity of the data node
-   * @param cacheUsed cache used on the data node
-   * @param xceiverCount # of data transfers at the data node
-   */
-  public DatanodeDescriptor(DatanodeID nodeID, 
-                            long capacity,
-                            long dfsUsed,
-                            long remaining,
-                            long bpused,
-                            long cacheCapacity,
-                            long cacheUsed,
-                            int xceiverCount,
-                            int failedVolumes) {
-    super(nodeID);
-    updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity,
-        cacheUsed, xceiverCount, failedVolumes);
-  }
-
-  /**
-   * DatanodeDescriptor constructor
-   * @param nodeID id of the data node
-   * @param networkLocation location of the data node in network
-   * @param capacity capacity of the data node, including space used by non-dfs
-   * @param dfsUsed the used space by dfs datanode
-   * @param remaining remaining capacity of the data node
-   * @param bpused space used by the block pool corresponding to this namenode
-   * @param cacheCapacity cache capacity of the data node
-   * @param cacheUsed cache used on the data node
-   * @param xceiverCount # of data transfers at the data node
-   */
-  public DatanodeDescriptor(DatanodeID nodeID,
-                            String networkLocation,
-                            long capacity,
-                            long dfsUsed,
-                            long remaining,
-                            long bpused,
-                            long cacheCapacity,
-                            long cacheUsed,
-                            int xceiverCount,
-                            int failedVolumes) {
     super(nodeID, networkLocation);
-    updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity,
-        cacheUsed, xceiverCount, failedVolumes);
+    updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
   }
 
   /**
-   * Add datanode to the block.
-   * Add block to the head of the list of blocks belonging to the data-node.
+   * Add data-node to the block. Add block to the head of the list of blocks
+   * belonging to the data-node.
    */
-  public boolean addBlock(BlockInfo b) {
-    if(!b.addNode(this))
-      return false;
-    // add to the head of the data-node list
-    blockList = b.listInsert(blockList, this);
-    numBlocks++;
-    return true;
+  public boolean addBlock(String storageID, BlockInfo b) {
+    DatanodeStorageInfo s = getStorageInfo(storageID);
+    if (s != null) {
+      return s.addBlock(b);
+    }
+    return false;
   }
-  
-  /**
-   * Remove block from the list of blocks belonging to the data-node.
-   * Remove datanode from the block.
-   */
-  public boolean removeBlock(BlockInfo b) {
-    blockList = b.listRemove(blockList, this);
-    if ( b.removeNode(this) ) {
-      numBlocks--;
-      return true;
-    } else {
-      return false;
+
+  DatanodeStorageInfo getStorageInfo(String storageID) {
+    synchronized (storageMap) {
+      return storageMap.get(storageID);
+    }
+  }
+  DatanodeStorageInfo[] getStorageInfos() {
+    synchronized (storageMap) {
+      final Collection<DatanodeStorageInfo> storages = storageMap.values();
+      return storages.toArray(new DatanodeStorageInfo[storages.size()]);
     }
   }
 
   /**
-   * Move block to the head of the list of blocks belonging to the data-node.
-   * @return the index of the head of the blockList
+   * Remove block from the list of blocks belonging to the data-node. Remove
+   * data-node from the block.
    */
-  int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
-    blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
-    return curIndex;
+  boolean removeBlock(BlockInfo b) {
+    int index = b.findStorageInfo(this);
+    // if block exists on this datanode
+    if (index >= 0) {
+      DatanodeStorageInfo s = b.getStorageInfo(index);
+      if (s != null) {
+        return s.removeBlock(b);
+      }
+    }
+    return false;
   }
-
+  
   /**
-   * Used for testing only
-   * @return the head of the blockList
+   * Remove block from the list of blocks belonging to the data-node. Remove
+   * data-node from the block.
    */
-  @VisibleForTesting
-  protected BlockInfo getHead(){
-    return blockList;
+  boolean removeBlock(String storageID, BlockInfo b) {
+    DatanodeStorageInfo s = getStorageInfo(storageID);
+    if (s != null) {
+      return s.removeBlock(b);
+    }
+    return false;
   }
 
   /**
@@ -355,9 +293,12 @@ public class DatanodeDescriptor extends 
    * @return the new block
    */
   public BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) {
-    boolean done = removeBlock(oldBlock);
+    int index = oldBlock.findStorageInfo(this);
+    DatanodeStorageInfo s = oldBlock.getStorageInfo(index);
+    boolean done = s.removeBlock(oldBlock);
     assert done : "Old block should belong to the data-node when replacing";
-    done = addBlock(newBlock);
+
+    done = s.addBlock(newBlock);
     assert done : "New block should not belong to the data-node when replacing";
     return newBlock;
   }
@@ -368,7 +309,6 @@ public class DatanodeDescriptor extends 
     setBlockPoolUsed(0);
     setDfsUsed(0);
     setXceiverCount(0);
-    this.blockList = null;
     this.invalidateBlocks.clear();
     this.volumeFailures = 0;
     // pendingCached, cached, and pendingUncached are protected by the
@@ -392,66 +332,97 @@ public class DatanodeDescriptor extends 
   }
 
   public int numBlocks() {
-    return numBlocks;
+    int blocks = 0;
+    for (DatanodeStorageInfo entry : getStorageInfos()) {
+      blocks += entry.numBlocks();
+    }
+    return blocks;
   }
 
   /**
    * Updates stats from datanode heartbeat.
    */
-  public void updateHeartbeat(long capacity, long dfsUsed, long remaining,
-      long blockPoolUsed, long cacheCapacity, long cacheUsed, int xceiverCount,
-      int volFailures) {
-    setCapacity(capacity);
-    setRemaining(remaining);
-    setBlockPoolUsed(blockPoolUsed);
-    setDfsUsed(dfsUsed);
+  public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
+      long cacheUsed, int xceiverCount, int volFailures) {
+    long totalCapacity = 0;
+    long totalRemaining = 0;
+    long totalBlockPoolUsed = 0;
+    long totalDfsUsed = 0;
+
     setCacheCapacity(cacheCapacity);
     setCacheUsed(cacheUsed);
     setXceiverCount(xceiverCount);
     setLastUpdate(Time.now());    
     this.volumeFailures = volFailures;
-    this.heartbeatedSinceFailover = true;
+    for (StorageReport report : reports) {
+      DatanodeStorageInfo storage = storageMap.get(report.getStorageID());
+      if (storage == null) {
+        // This is seen during cluster initialization when the heartbeat
+        // is received before the initial block reports from each storage.
+        storage = updateStorage(new DatanodeStorage(report.getStorageID()));
+      }
+      storage.receivedHeartbeat(report);
+      totalCapacity += report.getCapacity();
+      totalRemaining += report.getRemaining();
+      totalBlockPoolUsed += report.getBlockPoolUsed();
+      totalDfsUsed += report.getDfsUsed();
+    }
     rollBlocksScheduled(getLastUpdate());
+
+    // Update total metrics for the node.
+    setCapacity(totalCapacity);
+    setRemaining(totalRemaining);
+    setBlockPoolUsed(totalBlockPoolUsed);
+    setDfsUsed(totalDfsUsed);
   }
 
-  /**
-   * Iterates over the list of blocks belonging to the datanode.
-   */
-  public static class BlockIterator implements Iterator<BlockInfo> {
-    private BlockInfo current;
-    private DatanodeDescriptor node;
-      
-    BlockIterator(BlockInfo head, DatanodeDescriptor dn) {
-      this.current = head;
-      this.node = dn;
+  private static class BlockIterator implements Iterator<BlockInfo> {
+    private int index = 0;
+    private final List<Iterator<BlockInfo>> iterators;
+    
+    private BlockIterator(final DatanodeStorageInfo... storages) {
+      List<Iterator<BlockInfo>> iterators = new ArrayList<Iterator<BlockInfo>>();
+      for (DatanodeStorageInfo e : storages) {
+        iterators.add(e.getBlockIterator());
+      }
+      this.iterators = Collections.unmodifiableList(iterators);
     }
 
     @Override
     public boolean hasNext() {
-      return current != null;
+      update();
+      return !iterators.isEmpty() && iterators.get(index).hasNext();
     }
 
     @Override
     public BlockInfo next() {
-      BlockInfo res = current;
-      current = current.getNext(current.findDatanode(node));
-      return res;
+      update();
+      return iterators.get(index).next();
     }
-
+    
     @Override
-    public void remove()  {
-      throw new UnsupportedOperationException("Sorry. can't remove.");
+    public void remove() {
+      throw new UnsupportedOperationException("Remove unsupported.");
+    }
+    
+    private void update() {
+      while(index < iterators.size() - 1 && !iterators.get(index).hasNext()) {
+        index++;
+      }
     }
   }
 
-  public Iterator<BlockInfo> getBlockIterator() {
-    return new BlockIterator(this.blockList, this);
+  Iterator<BlockInfo> getBlockIterator() {
+    return new BlockIterator(getStorageInfos());
+  }
+  Iterator<BlockInfo> getBlockIterator(final String storageID) {
+    return new BlockIterator(getStorageInfo(storageID));
   }
 
   /**
    * Store block replication work.
    */
-  void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
+  void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
     assert(block != null && targets != null && targets.length > 0);
     replicateBlocks.offer(new BlockTargetPair(block, targets));
   }
@@ -526,18 +497,14 @@ public class DatanodeDescriptor extends 
   public int getBlocksScheduled() {
     return currApproxBlocksScheduled + prevApproxBlocksScheduled;
   }
-  
-  /**
-   * Increments counter for number of blocks scheduled. 
-   */
-  public void incBlocksScheduled() {
+
+  /** Increment the number of blocks scheduled. */
+  void incrementBlocksScheduled() {
     currApproxBlocksScheduled++;
   }
   
-  /**
-   * Decrements counter for number of blocks scheduled.
-   */
-  void decBlocksScheduled() {
+  /** Decrement the number of blocks scheduled. */
+  void decrementBlocksScheduled() {
     if (prevApproxBlocksScheduled > 0) {
       prevApproxBlocksScheduled--;
     } else if (currApproxBlocksScheduled > 0) {
@@ -546,12 +513,9 @@ public class DatanodeDescriptor extends 
     // its ok if both counters are zero.
   }
   
-  /**
-   * Adjusts curr and prev number of blocks scheduled every few minutes.
-   */
+  /** Adjusts curr and prev number of blocks scheduled every few minutes. */
   private void rollBlocksScheduled(long now) {
-    if ((now - lastBlocksScheduledRollTime) > 
-        BLOCKS_SCHEDULED_ROLL_INTERVAL) {
+    if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) {
       prevApproxBlocksScheduled = currApproxBlocksScheduled;
       currApproxBlocksScheduled = 0;
       lastBlocksScheduledRollTime = now;
@@ -647,7 +611,11 @@ public class DatanodeDescriptor extends 
   @Override
   public void updateRegInfo(DatanodeID nodeReg) {
     super.updateRegInfo(nodeReg);
-    firstBlockReport = true; // must re-process IBR after re-registration
+    
+    // must re-process IBR after re-registration
+    for(DatanodeStorageInfo storage : getStorageInfos()) {
+      storage.setBlockReportCount(0);
+    }
   }
 
   /**
@@ -664,26 +632,6 @@ public class DatanodeDescriptor extends 
     this.bandwidth = bandwidth;
   }
 
-  public boolean areBlockContentsStale() {
-    return blockContentsStale;
-  }
-
-  public void markStaleAfterFailover() {
-    heartbeatedSinceFailover = false;
-    blockContentsStale = true;
-  }
-
-  public void receivedBlockReport() {
-    if (heartbeatedSinceFailover) {
-      blockContentsStale = false;
-    }
-    firstBlockReport = false;
-  }
-  
-  boolean isFirstBlockReport() {
-    return firstBlockReport;
-  }
-
   @Override
   public String dumpDatanode() {
     StringBuilder sb = new StringBuilder(super.dumpDatanode());
@@ -702,6 +650,19 @@ public class DatanodeDescriptor extends 
     return sb.toString();
   }
 
+  DatanodeStorageInfo updateStorage(DatanodeStorage s) {
+    synchronized (storageMap) {
+      DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
+      if (storage == null) {
+        LOG.info("Adding new storage ID " + s.getStorageID() +
+                 " for DN " + getXferAddr());
+        storage = new DatanodeStorageInfo(this, s);
+        storageMap.put(s.getStorageID(), storage);
+      }
+      return storage;
+    }
+  }
+
   /**
    * @return   The time at which we last sent caching directives to this 
    *           DataNode, in monotonic milliseconds.
@@ -718,3 +679,4 @@ public class DatanodeDescriptor extends 
     this.lastCachingDirectiveSentTimeMs = time;
   }
 }
+

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Dec 13 17:28:14 2013
@@ -424,9 +424,13 @@ public class DatanodeManager {
   }
 
 
-  /** Get a datanode descriptor given corresponding storageID */
-  DatanodeDescriptor getDatanode(final String storageID) {
-    return datanodeMap.get(storageID);
+  /** Get a datanode descriptor given corresponding DatanodeUUID */
+  DatanodeDescriptor getDatanode(final String datanodeUuid) {
+    if (datanodeUuid == null) {
+      return null;
+    }
+
+    return datanodeMap.get(datanodeUuid);
   }
 
   /**
@@ -438,7 +442,7 @@ public class DatanodeManager {
    */
   public DatanodeDescriptor getDatanode(DatanodeID nodeID
       ) throws UnregisteredNodeException {
-    final DatanodeDescriptor node = getDatanode(nodeID.getStorageID());
+    final DatanodeDescriptor node = getDatanode(nodeID.getDatanodeUuid());
     if (node == null) 
       return null;
     if (!node.getXferAddr().equals(nodeID.getXferAddr())) {
@@ -451,6 +455,20 @@ public class DatanodeManager {
     return node;
   }
 
+  public DatanodeStorageInfo[] getDatanodeStorageInfos(
+      DatanodeID[] datanodeID, String[] storageIDs)
+          throws UnregisteredNodeException {
+    if (datanodeID.length == 0) {
+      return null;
+    }
+    final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[datanodeID.length];
+    for(int i = 0; i < datanodeID.length; i++) {
+      final DatanodeDescriptor dd = getDatanode(datanodeID[i]);
+      storages[i] = dd.getStorageInfo(storageIDs[i]);
+    }
+    return storages; 
+  }
+
   /** Prints information about all datanodes. */
   void datanodeDump(final PrintWriter out) {
     synchronized (datanodeMap) {
@@ -528,7 +546,7 @@ public class DatanodeManager {
     // remove  from host2DatanodeMap the datanodeDescriptor removed
     // from datanodeMap before adding node to host2DatanodeMap.
     synchronized(datanodeMap) {
-      host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node));
+      host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
     }
 
     networktopology.add(node); // may throw InvalidTopologyException
@@ -543,7 +561,7 @@ public class DatanodeManager {
 
   /** Physically remove node from datanodeMap. */
   private void wipeDatanode(final DatanodeID node) {
-    final String key = node.getStorageID();
+    final String key = node.getDatanodeUuid();
     synchronized (datanodeMap) {
       host2DatanodeMap.remove(datanodeMap.remove(key));
     }
@@ -705,8 +723,10 @@ public class DatanodeManager {
   /** Start decommissioning the specified datanode. */
   private void startDecommission(DatanodeDescriptor node) {
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
-      LOG.info("Start Decommissioning " + node + " with " + 
-          node.numBlocks() +  " blocks");
+      for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+        LOG.info("Start Decommissioning " + node + " " + storage
+            + " with " + storage.numBlocks() + " blocks");
+      }
       heartbeatManager.startDecommission(node);
       node.decommissioningStatus.setStartTime(now());
       
@@ -729,24 +749,6 @@ public class DatanodeManager {
   }
 
   /**
-   * Generate new storage ID.
-   * 
-   * @return unique storage ID
-   * 
-   * Note: that collisions are still possible if somebody will try 
-   * to bring in a data storage from a different cluster.
-   */
-  private String newStorageID() {
-    String newID = null;
-    while(newID == null) {
-      newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
-      if (datanodeMap.get(newID) != null)
-        newID = null;
-    }
-    return newID;
-  }
-
-  /**
    * Register the given datanode with the namenode. NB: the given
    * registration is mutated and given back to the datanode.
    *
@@ -784,9 +786,9 @@ public class DatanodeManager {
       }
         
       NameNode.stateChangeLog.info("BLOCK* registerDatanode: from "
-          + nodeReg + " storage " + nodeReg.getStorageID());
+          + nodeReg + " storage " + nodeReg.getDatanodeUuid());
   
-      DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
+      DatanodeDescriptor nodeS = getDatanode(nodeReg.getDatanodeUuid());
       DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
           nodeReg.getIpAddr(), nodeReg.getXferPort());
         
@@ -821,7 +823,7 @@ public class DatanodeManager {
          */        
           NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
               + " is replaced by " + nodeReg + " with the same storageID "
-              + nodeReg.getStorageID());
+              + nodeReg.getDatanodeUuid());
         }
         
         boolean success = false;
@@ -853,20 +855,8 @@ public class DatanodeManager {
           }
         }
         return;
-      } 
-  
-      // this is a new datanode serving a new data storage
-      if ("".equals(nodeReg.getStorageID())) {
-        // this data storage has never been registered
-        // it is either empty or was created by pre-storageID version of DFS
-        nodeReg.setStorageID(newStorageID());
-        if (NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug(
-              "BLOCK* NameSystem.registerDatanode: "
-              + "new storageID " + nodeReg.getStorageID() + " assigned.");
-        }
       }
-      
+
       DatanodeDescriptor nodeDescr 
         = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
       boolean success = false;
@@ -1234,10 +1224,10 @@ public class DatanodeManager {
 
   /** Handle heartbeat from datanodes. */
   public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
-      final String blockPoolId,
-      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-      long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers,
-      int failedVolumes) throws IOException {
+      StorageReport[] reports, final String blockPoolId,
+      long cacheCapacity, long cacheUsed, int xceiverCount, 
+      int maxTransfers, int failedVolumes
+      ) throws IOException {
     synchronized (heartbeatManager) {
       synchronized (datanodeMap) {
         DatanodeDescriptor nodeinfo = null;
@@ -1257,9 +1247,9 @@ public class DatanodeManager {
           return new DatanodeCommand[]{RegisterCommand.REGISTER};
         }
 
-        heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,
-            remaining, blockPoolUsed, cacheCapacity, cacheUsed, xceiverCount,
-            failedVolumes);
+        heartbeatManager.updateHeartbeat(nodeinfo, reports,
+                                         cacheCapacity, cacheUsed,
+                                         xceiverCount, failedVolumes);
 
         // If we are in safemode, do not send back any recovery / replication
         // requests. Don't even drain the existing queue of work.
@@ -1274,32 +1264,32 @@ public class DatanodeManager {
           BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
               blocks.length);
           for (BlockInfoUnderConstruction b : blocks) {
-            DatanodeDescriptor[] expectedLocations = b.getExpectedLocations();
+            final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
             // Skip stale nodes during recovery - not heart beated for some time (30s by default).
-            List<DatanodeDescriptor> recoveryLocations =
-                new ArrayList<DatanodeDescriptor>(expectedLocations.length);
-            for (int i = 0; i < expectedLocations.length; i++) {
-              if (!expectedLocations[i].isStale(this.staleInterval)) {
-                recoveryLocations.add(expectedLocations[i]);
+            final List<DatanodeStorageInfo> recoveryLocations =
+                new ArrayList<DatanodeStorageInfo>(storages.length);
+            for (int i = 0; i < storages.length; i++) {
+              if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
+                recoveryLocations.add(storages[i]);
               }
             }
             // If we only get 1 replica after eliminating stale nodes, then choose all
             // replicas for recovery and let the primary data node handle failures.
             if (recoveryLocations.size() > 1) {
-              if (recoveryLocations.size() != expectedLocations.length) {
+              if (recoveryLocations.size() != storages.length) {
                 LOG.info("Skipped stale nodes for recovery : " +
-                    (expectedLocations.length - recoveryLocations.size()));
+                    (storages.length - recoveryLocations.size()));
               }
               brCommand.add(new RecoveringBlock(
                   new ExtendedBlock(blockPoolId, b),
-                  recoveryLocations.toArray(new DatanodeDescriptor[recoveryLocations.size()]),
+                  DatanodeStorageInfo.toDatanodeInfos(recoveryLocations),
                   b.getBlockRecoveryId()));
             } else {
               // If too many replicas are stale, then choose all replicas to participate
               // in block recovery.
               brCommand.add(new RecoveringBlock(
                   new ExtendedBlock(blockPoolId, b),
-                  expectedLocations,
+                  DatanodeStorageInfo.toDatanodeInfos(storages),
                   b.getBlockRecoveryId()));
             }
           }
@@ -1416,7 +1406,9 @@ public class DatanodeManager {
     LOG.info("Marking all datandoes as stale");
     synchronized (datanodeMap) {
       for (DatanodeDescriptor dn : datanodeMap.values()) {
-        dn.markStaleAfterFailover();
+        for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
+          storage.markStaleAfterFailover();
+        }
       }
     }
   }
@@ -1455,3 +1447,4 @@ public class DatanodeManager {
     this.shouldSendCachingCommands = shouldSendCachingCommands;
   }
 }
+

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Fri Dec 13 17:28:14 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 
@@ -181,7 +182,7 @@ class HeartbeatManager implements Datano
       addDatanode(d);
 
       //update its timestamp
-      d.updateHeartbeat(0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
+      d.updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
     }
   }
 
@@ -203,11 +204,11 @@ class HeartbeatManager implements Datano
   }
 
   synchronized void updateHeartbeat(final DatanodeDescriptor node,
-      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-      long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes) {
+      StorageReport[] reports, long cacheCapacity, long cacheUsed,
+      int xceiverCount, int failedVolumes) {
     stats.subtract(node);
-    node.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
-        cacheCapacity, cacheUsed, xceiverCount, failedVolumes);
+    node.updateHeartbeat(reports, cacheCapacity, cacheUsed,
+      xceiverCount, failedVolumes);
     stats.add(node);
   }
 
@@ -358,3 +359,4 @@ class HeartbeatManager implements Datano
     }
   }
 }
+

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Fri Dec 13 17:28:14 2013
@@ -78,10 +78,10 @@ class InvalidateBlocks {
    */
   synchronized void add(final Block block, final DatanodeInfo datanode,
       final boolean log) {
-    LightWeightHashSet<Block> set = node2blocks.get(datanode.getStorageID());
+    LightWeightHashSet<Block> set = node2blocks.get(datanode.getDatanodeUuid());
     if (set == null) {
       set = new LightWeightHashSet<Block>();
-      node2blocks.put(datanode.getStorageID(), set);
+      node2blocks.put(datanode.getDatanodeUuid(), set);
     }
     if (set.add(block)) {
       numBlocks++;

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java Fri Dec 13 17:28:14 2013
@@ -34,5 +34,5 @@ public interface MutableBlockCollection 
    * and set the locations.
    */
   public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
-      DatanodeDescriptor[] locations) throws IOException;
+      DatanodeStorageInfo[] storages) throws IOException;
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Fri Dec 13 17:28:14 2013
@@ -42,11 +42,13 @@ class PendingDataNodeMessages {
   static class ReportedBlockInfo {
     private final Block block;
     private final DatanodeDescriptor dn;
+    private final String storageID;
     private final ReplicaState reportedState;
 
-    ReportedBlockInfo(DatanodeDescriptor dn, Block block,
+    ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block,
         ReplicaState reportedState) {
       this.dn = dn;
+      this.storageID = storageID;
       this.block = block;
       this.reportedState = reportedState;
     }
@@ -58,6 +60,10 @@ class PendingDataNodeMessages {
     DatanodeDescriptor getNode() {
       return dn;
     }
+    
+    String getStorageID() {
+      return storageID;
+    }
 
     ReplicaState getReportedState() {
       return reportedState;
@@ -70,11 +76,11 @@ class PendingDataNodeMessages {
     }
   }
   
-  void enqueueReportedBlock(DatanodeDescriptor dn, Block block,
+  void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
       ReplicaState reportedState) {
     block = new Block(block);
     getBlockQueue(block).add(
-        new ReportedBlockInfo(dn, block, reportedState));
+        new ReportedBlockInfo(dn, storageID, block, reportedState));
     count++;
   }
   

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Fri Dec 13 17:28:14 2013
@@ -236,6 +236,8 @@ public abstract class Storage extends St
     final boolean useLock;        // flag to enable storage lock
     final StorageDirType dirType; // storage dir type
     FileLock lock;                // storage lock
+
+    private String storageUuid = null;      // Storage directory identifier.
     
     public StorageDirectory(File dir) {
       // default dirType is null
@@ -246,6 +248,14 @@ public abstract class Storage extends St
       this(dir, dirType, true);
     }
     
+    public void setStorageUuid(String storageUuid) {
+      this.storageUuid = storageUuid;
+    }
+
+    public String getStorageUuid() {
+      return storageUuid;
+    }
+
     /**
      * Constructor
      * @param dir directory corresponding to the storage

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Fri Dec 13 17:28:14 2013
@@ -27,6 +27,7 @@ import java.util.concurrent.CopyOnWriteA
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -160,31 +161,32 @@ class BPOfferService {
   synchronized NamespaceInfo getNamespaceInfo() {
     return bpNSInfo;
   }
-  
+
   @Override
   public String toString() {
     if (bpNSInfo == null) {
       // If we haven't yet connected to our NN, we don't yet know our
       // own block pool ID.
       // If _none_ of the block pools have connected yet, we don't even
-      // know the storage ID of this DN.
-      String storageId = dn.getStorageId();
-      if (storageId == null || "".equals(storageId)) {
-        storageId = "unknown";
+      // know the DatanodeID ID of this DN.
+      String datanodeUuid = dn.getDatanodeUuid();
+
+      if (datanodeUuid == null || datanodeUuid.isEmpty()) {
+        datanodeUuid = "unassigned";
       }
-      return "Block pool <registering> (storage id " + storageId +
-        ")";
+      return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
     } else {
       return "Block pool " + getBlockPoolId() +
-        " (storage id " + dn.getStorageId() +
-        ")";
+          " (Datanode Uuid " + dn.getDatanodeUuid() +
+          ")";
     }
   }
   
-  void reportBadBlocks(ExtendedBlock block) {
+  void reportBadBlocks(ExtendedBlock block,
+                       String storageUuid, StorageType storageType) {
     checkBlock(block);
     for (BPServiceActor actor : bpServices) {
-      actor.reportBadBlocks(block);
+      actor.reportBadBlocks(block, storageUuid, storageType);
     }
   }
   
@@ -193,7 +195,8 @@ class BPOfferService {
    * till namenode is informed before responding with success to the
    * client? For now we don't.
    */
-  void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+  void notifyNamenodeReceivedBlock(
+      ExtendedBlock block, String delHint, String storageUuid) {
     checkBlock(block);
     checkDelHint(delHint);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
@@ -202,7 +205,7 @@ class BPOfferService {
         delHint);
 
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeBlockImmediately(bInfo);
+      actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
     }
   }
 
@@ -219,23 +222,23 @@ class BPOfferService {
         "delHint is null");
   }
 
-  void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+  void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
     checkBlock(block);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
        block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null);
     
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeDeletedBlock(bInfo);
+      actor.notifyNamenodeDeletedBlock(bInfo, storageUuid);
     }
   }
   
-  void notifyNamenodeReceivingBlock(ExtendedBlock block) {
+  void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
     checkBlock(block);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
        block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null);
     
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeBlockImmediately(bInfo);
+      actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
     }
   }
 
@@ -274,12 +277,22 @@ class BPOfferService {
   synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
     if (this.bpNSInfo == null) {
       this.bpNSInfo = nsInfo;
-      
+      boolean success = false;
+
       // Now that we know the namespace ID, etc, we can pass this to the DN.
       // The DN can now initialize its local storage if we are the
       // first BP to handshake, etc.
-      dn.initBlockPool(this);
-      return;
+      try {
+        dn.initBlockPool(this);
+        success = true;
+      } finally {
+        if (!success) {
+          // The datanode failed to initialize the BP. We need to reset
+          // the namespace info so that other BPService actors still have
+          // a chance to set it, and re-initialize the datanode.
+          this.bpNSInfo = null;
+        }
+      }
     } else {
       checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
           "Blockpool ID");
@@ -328,7 +341,7 @@ class BPOfferService {
     }
   }
 
-  synchronized DatanodeRegistration createRegistration() {
+  synchronized DatanodeRegistration createRegistration() throws IOException {
     Preconditions.checkState(bpNSInfo != null,
         "getRegistration() can only be called after initial handshake");
     return dn.createBPRegistration(bpNSInfo);

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Fri Dec 13 17:28:14 2013
@@ -22,7 +22,6 @@ import static org.apache.hadoop.util.Tim
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
-import java.net.URI;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionUtil;
@@ -100,9 +99,9 @@ class BPServiceActor implements Runnable
    * keyed by block ID, contains the pending changes which have yet to be
    * reported to the NN. Access should be synchronized on this object.
    */
-  private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR 
-    = Maps.newHashMap();
-  
+  private final Map<String, PerStoragePendingIncrementalBR>
+      pendingIncrementalBRperStorage = Maps.newHashMap();
+
   private volatile int pendingReceivedRequests = 0;
   private volatile boolean shouldServiceRun = true;
   private final DataNode dn;
@@ -244,12 +243,15 @@ class BPServiceActor implements Runnable
     resetBlockReportTime = true; // reset future BRs for randomness
   }
 
-  void reportBadBlocks(ExtendedBlock block) {
+  void reportBadBlocks(ExtendedBlock block,
+      String storageUuid, StorageType storageType) {
     if (bpRegistration == null) {
       return;
     }
     DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
-    LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; 
+    String[] uuids = { storageUuid };
+    StorageType[] types = { storageType };
+    LocatedBlock[] blocks = { new LocatedBlock(block, dnArr, uuids, types) };
     
     try {
       bpNamenode.reportBadBlocks(blocks);  
@@ -263,69 +265,120 @@ class BPServiceActor implements Runnable
   }
   
   /**
-   * Report received blocks and delete hints to the Namenode
-   * 
+   * Report received blocks and delete hints to the Namenode for each
+   * storage.
+   *
    * @throws IOException
    */
   private void reportReceivedDeletedBlocks() throws IOException {
 
-    // check if there are newly received blocks
-    ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
-    synchronized (pendingIncrementalBR) {
-      int numBlocks = pendingIncrementalBR.size();
-      if (numBlocks > 0) {
-        //
-        // Send newly-received and deleted blockids to namenode
-        //
-        receivedAndDeletedBlockArray = pendingIncrementalBR
-            .values().toArray(new ReceivedDeletedBlockInfo[numBlocks]);
+    // Generate a list of the pending reports for each storage under the lock
+    Map<String, ReceivedDeletedBlockInfo[]> blockArrays = Maps.newHashMap();
+    synchronized (pendingIncrementalBRperStorage) {
+      for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
+           pendingIncrementalBRperStorage.entrySet()) {
+        final String storageUuid = entry.getKey();
+        final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
+
+        if (perStorageMap.getBlockInfoCount() > 0) {
+          // Send newly-received and deleted blockids to namenode
+          ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
+          pendingReceivedRequests =
+              (pendingReceivedRequests > rdbi.length ?
+                  (pendingReceivedRequests - rdbi.length) : 0);
+          blockArrays.put(storageUuid, rdbi);
+        }
       }
-      pendingIncrementalBR.clear();
     }
-    if (receivedAndDeletedBlockArray != null) {
+
+    // Send incremental block reports to the Namenode outside the lock
+    for (Map.Entry<String, ReceivedDeletedBlockInfo[]> entry :
+         blockArrays.entrySet()) {
+      final String storageUuid = entry.getKey();
+      final ReceivedDeletedBlockInfo[] rdbi = entry.getValue();
+
       StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
-          bpRegistration.getStorageID(), receivedAndDeletedBlockArray) };
+          storageUuid, rdbi) };
       boolean success = false;
       try {
-        bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
-            report);
+        bpNamenode.blockReceivedAndDeleted(bpRegistration,
+            bpos.getBlockPoolId(), report);
         success = true;
       } finally {
-        synchronized (pendingIncrementalBR) {
-          if (!success) {
+        if (!success) {
+          synchronized (pendingIncrementalBRperStorage) {
             // If we didn't succeed in sending the report, put all of the
-            // blocks back onto our queue, but only in the case where we didn't
-            // put something newer in the meantime.
-            for (ReceivedDeletedBlockInfo rdbi : receivedAndDeletedBlockArray) {
-              if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
-                pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
-              }
-            }
+            // blocks back onto our queue, but only in the case where we
+            // didn't put something newer in the meantime.
+            PerStoragePendingIncrementalBR perStorageMap =
+                pendingIncrementalBRperStorage.get(storageUuid);
+            pendingReceivedRequests += perStorageMap.putMissingBlockInfos(rdbi);
           }
-          pendingReceivedRequests = pendingIncrementalBR.size();
         }
       }
     }
   }
 
+  /**
+   * Retrieve the incremental BR state for a given storage UUID
+   * @param storageUuid
+   * @return
+   */
+  private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
+      String storageUuid) {
+    PerStoragePendingIncrementalBR mapForStorage =
+        pendingIncrementalBRperStorage.get(storageUuid);
+
+    if (mapForStorage == null) {
+      // This is the first time we are adding incremental BR state for
+      // this storage so create a new map. This is required once per
+      // storage, per service actor.
+      mapForStorage = new PerStoragePendingIncrementalBR();
+      pendingIncrementalBRperStorage.put(storageUuid, mapForStorage);
+    }
+
+    return mapForStorage;
+  }
+
+  /**
+   * Add a blockInfo for notification to NameNode. If another entry
+   * exists for the same block it is removed.
+   *
+   * Caller must synchronize access using pendingIncrementalBRperStorage.
+   * @param bInfo
+   * @param storageUuid
+   */
+  void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo,
+      String storageUuid) {
+    // Make sure another entry for the same block is first removed.
+    // There may only be one such entry.
+    for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
+          pendingIncrementalBRperStorage.entrySet()) {
+      if (entry.getValue().removeBlockInfo(bInfo)) {
+        break;
+      }
+    }
+    getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
+  }
+
   /*
    * Informing the name node could take a long long time! Should we wait
    * till namenode is informed before responding with success to the
    * client? For now we don't.
    */
-  void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) {
-    synchronized (pendingIncrementalBR) {
-      pendingIncrementalBR.put(
-          bInfo.getBlock().getBlockId(), bInfo);
+  void notifyNamenodeBlockImmediately(
+      ReceivedDeletedBlockInfo bInfo, String storageUuid) {
+    synchronized (pendingIncrementalBRperStorage) {
+      addPendingReplicationBlockInfo(bInfo, storageUuid);
       pendingReceivedRequests++;
-      pendingIncrementalBR.notifyAll();
+      pendingIncrementalBRperStorage.notifyAll();
     }
   }
 
-  void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) {
-    synchronized (pendingIncrementalBR) {
-      pendingIncrementalBR.put(
-          bInfo.getBlock().getBlockId(), bInfo);
+  void notifyNamenodeDeletedBlock(
+      ReceivedDeletedBlockInfo bInfo, String storageUuid) {
+    synchronized (pendingIncrementalBRperStorage) {
+      addPendingReplicationBlockInfo(bInfo, storageUuid);
     }
   }
 
@@ -334,13 +387,13 @@ class BPServiceActor implements Runnable
    */
   @VisibleForTesting
   void triggerBlockReportForTests() {
-    synchronized (pendingIncrementalBR) {
+    synchronized (pendingIncrementalBRperStorage) {
       lastBlockReport = 0;
       lastHeartbeat = 0;
-      pendingIncrementalBR.notifyAll();
+      pendingIncrementalBRperStorage.notifyAll();
       while (lastBlockReport == 0) {
         try {
-          pendingIncrementalBR.wait(100);
+          pendingIncrementalBRperStorage.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -350,12 +403,12 @@ class BPServiceActor implements Runnable
   
   @VisibleForTesting
   void triggerHeartbeatForTests() {
-    synchronized (pendingIncrementalBR) {
+    synchronized (pendingIncrementalBRperStorage) {
       lastHeartbeat = 0;
-      pendingIncrementalBR.notifyAll();
+      pendingIncrementalBRperStorage.notifyAll();
       while (lastHeartbeat == 0) {
         try {
-          pendingIncrementalBR.wait(100);
+          pendingIncrementalBRperStorage.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -365,13 +418,13 @@ class BPServiceActor implements Runnable
 
   @VisibleForTesting
   void triggerDeletionReportForTests() {
-    synchronized (pendingIncrementalBR) {
+    synchronized (pendingIncrementalBRperStorage) {
       lastDeletedReport = 0;
-      pendingIncrementalBR.notifyAll();
+      pendingIncrementalBRperStorage.notifyAll();
 
       while (lastDeletedReport == 0) {
         try {
-          pendingIncrementalBR.wait(100);
+          pendingIncrementalBRperStorage.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -395,23 +448,38 @@ class BPServiceActor implements Runnable
       // a FINALIZED one.
       reportReceivedDeletedBlocks();
 
+      // Send one block report per known storage.
+
       // Create block report
       long brCreateStartTime = now();
-      BlockListAsLongs bReport = dn.getFSDataset().getBlockReport(
-          bpos.getBlockPoolId());
+      long totalBlockCount = 0;
+
+      Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
+          dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
 
       // Send block report
       long brSendStartTime = now();
-      StorageBlockReport[] report = { new StorageBlockReport(
-          new DatanodeStorage(bpRegistration.getStorageID()),
-          bReport.getBlockListAsLongs()) };
-      cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), report);
+      StorageBlockReport[] reports =
+          new StorageBlockReport[perVolumeBlockLists.size()];
+
+      int i = 0;
+      for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
+        DatanodeStorage dnStorage = kvPair.getKey();
+        BlockListAsLongs blockList = kvPair.getValue();
+        totalBlockCount += blockList.getNumberOfBlocks();
+
+        reports[i++] =
+            new StorageBlockReport(
+              dnStorage, blockList.getBlockListAsLongs());
+      }
+
+      cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);
 
       // Log the block report processing stats from Datanode perspective
       long brSendCost = now() - brSendStartTime;
       long brCreateCost = brSendStartTime - brCreateStartTime;
       dn.getMetrics().addBlockReport(brSendCost);
-      LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+      LOG.info("BlockReport of " + totalBlockCount
           + " blocks took " + brCreateCost + " msec to generate and "
           + brSendCost + " msecs for RPC and NN processing");
 
@@ -466,17 +534,15 @@ class BPServiceActor implements Runnable
   }
   
   HeartbeatResponse sendHeartBeat() throws IOException {
+    StorageReport[] reports =
+        dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Sending heartbeat from service actor: " + this);
+      LOG.debug("Sending heartbeat with " + reports.length +
+                " storage reports from service actor: " + this);
     }
-    // reports number of failed volumes
-    StorageReport[] report = { new StorageReport(bpRegistration.getStorageID(),
-        false,
-        dn.getFSDataset().getCapacity(),
-        dn.getFSDataset().getDfsUsed(),
-        dn.getFSDataset().getRemaining(),
-        dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) };
-    return bpNamenode.sendHeartbeat(bpRegistration, report,
+
+    return bpNamenode.sendHeartbeat(bpRegistration,
+        reports,
         dn.getFSDataset().getCacheCapacity(),
         dn.getFSDataset().getCacheUsed(),
         dn.getXmitsInProgress(),
@@ -496,9 +562,9 @@ class BPServiceActor implements Runnable
   }
   
   private String formatThreadName() {
-    Collection<URI> dataDirs = DataNode.getStorageDirs(dn.getConf());
-    return "DataNode: [" +
-      StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "] " +
+    Collection<StorageLocation> dataDirs =
+        DataNode.getStorageLocations(dn.getConf());
+    return "DataNode: [" + dataDirs.toString() + "] " +
       " heartbeating to " + nnAddr;
   }
   
@@ -608,10 +674,10 @@ class BPServiceActor implements Runnable
         //
         long waitTime = dnConf.heartBeatInterval - 
         (Time.now() - lastHeartbeat);
-        synchronized(pendingIncrementalBR) {
+        synchronized(pendingIncrementalBRperStorage) {
           if (waitTime > 0 && pendingReceivedRequests == 0) {
             try {
-              pendingIncrementalBR.wait(waitTime);
+              pendingIncrementalBRperStorage.wait(waitTime);
             } catch (InterruptedException ie) {
               LOG.warn("BPOfferService for " + this + " interrupted");
             }
@@ -782,4 +848,68 @@ class BPServiceActor implements Runnable
     }
   }
 
+  private static class PerStoragePendingIncrementalBR {
+    private Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR =
+        Maps.newHashMap();
+
+    /**
+     * Return the number of blocks on this storage that have pending
+     * incremental block reports.
+     * @return
+     */
+    int getBlockInfoCount() {
+      return pendingIncrementalBR.size();
+    }
+
+    /**
+     * Dequeue and return all pending incremental block report state.
+     * @return
+     */
+    ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
+      ReceivedDeletedBlockInfo[] blockInfos =
+          pendingIncrementalBR.values().toArray(
+              new ReceivedDeletedBlockInfo[getBlockInfoCount()]);
+
+      pendingIncrementalBR.clear();
+      return blockInfos;
+    }
+
+    /**
+     * Add blocks from blockArray to pendingIncrementalBR, unless the
+     * block already exists in pendingIncrementalBR.
+     * @param blockArray list of blocks to add.
+     * @return the number of missing blocks that we added.
+     */
+    int putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) {
+      int blocksPut = 0;
+      for (ReceivedDeletedBlockInfo rdbi : blockArray) {
+        if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
+          pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
+          ++blocksPut;
+        }
+      }
+      return blocksPut;
+    }
+
+    /**
+     * Add pending incremental block report for a single block.
+     * @param blockID
+     * @param blockInfo
+     */
+    void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
+      pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
+    }
+
+    /**
+     * Remove pending incremental block report for a single block if it
+     * exists.
+     *
+     * @param blockInfo
+     * @return true if a report was removed, false if no report existed for
+     *         the given block.
+     */
+    boolean removeBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
+      return (pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Fri Dec 13 17:28:14 2013
@@ -187,7 +187,7 @@ class BlockPoolSliceScanner {
         + hours + " hours for block pool " + bpid);
 
     // get the list of blocks and arrange them in random order
-    List<Block> arr = dataset.getFinalizedBlocks(blockPoolId);
+    List<FinalizedReplica> arr = dataset.getFinalizedBlocks(blockPoolId);
     Collections.shuffle(arr);
     
     long scanTime = -1;

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Dec 13 17:28:14 2013
@@ -162,7 +162,8 @@ class BlockReceiver implements Closeable
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
           replicaInfo = datanode.data.createRbw(block);
-          datanode.notifyNamenodeReceivingBlock(block);
+          datanode.notifyNamenodeReceivingBlock(
+              block, replicaInfo.getStorageUuid());
           break;
         case PIPELINE_SETUP_STREAMING_RECOVERY:
           replicaInfo = datanode.data.recoverRbw(
@@ -176,7 +177,8 @@ class BlockReceiver implements Closeable
                 block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
-          datanode.notifyNamenodeReceivingBlock(block);
+          datanode.notifyNamenodeReceivingBlock(
+              block, replicaInfo.getStorageUuid());
           break;
         case PIPELINE_SETUP_APPEND_RECOVERY:
           replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
@@ -185,7 +187,8 @@ class BlockReceiver implements Closeable
                 block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
-          datanode.notifyNamenodeReceivingBlock(block);
+          datanode.notifyNamenodeReceivingBlock(
+              block, replicaInfo.getStorageUuid());
           break;
         case TRANSFER_RBW:
         case TRANSFER_FINALIZED:
@@ -252,6 +255,10 @@ class BlockReceiver implements Closeable
   /** Return the datanode object. */
   DataNode getDataNode() {return datanode;}
 
+  String getStorageUuid() {
+    return replicaInfo.getStorageUuid();
+  }
+
   /**
    * close files.
    */
@@ -1073,14 +1080,15 @@ class BlockReceiver implements Closeable
           : 0;
       block.setNumBytes(replicaInfo.getNumBytes());
       datanode.data.finalizeBlock(block);
-      datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+      datanode.closeBlock(
+          block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
       if (ClientTraceLog.isInfoEnabled() && isClient) {
         long offset = 0;
         DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block
             .getBlockPoolId());
         ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr,
             myAddr, block.getNumBytes(), "HDFS_WRITE", clientname, offset,
-            dnR.getStorageID(), block, endTime - startTime));
+            dnR.getDatanodeUuid(), block, endTime - startTime));
       } else {
         LOG.info("Received " + block + " size " + block.getNumBytes()
             + " from " + inAddr);