You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/10/23 03:28:49 UTC

svn commit: r1534891 - in /hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ src/main/java/org/apache/hadoop/hdfs/s...

Author: arp
Date: Wed Oct 23 01:28:48 2013
New Revision: 1534891

URL: http://svn.apache.org/r1534891
Log:
HDFS-5390. Send one incremental block report per storage directory.

Modified:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt Wed Oct 23 01:28:48 2013
@@ -42,3 +42,6 @@ IMPROVEMENTS:
 
     HDFS-5398. NameNode changes to process storage reports per storage
     directory. (Arpit Agarwal)
+
+    HDFS-5390. Send one incremental block report per storage directory.
+    (Arpit Agarwal)

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Wed Oct 23 01:28:48 2013
@@ -192,7 +192,8 @@ class BPOfferService {
    * till namenode is informed before responding with success to the
    * client? For now we don't.
    */
-  void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+  void notifyNamenodeReceivedBlock(
+      ExtendedBlock block, String delHint, String storageUuid) {
     checkBlock(block);
     checkDelHint(delHint);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
@@ -201,7 +202,7 @@ class BPOfferService {
         delHint);
 
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeBlockImmediately(bInfo);
+      actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
     }
   }
 
@@ -218,23 +219,23 @@ class BPOfferService {
         "delHint is null");
   }
 
-  void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+  void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
     checkBlock(block);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
        block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null);
     
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeDeletedBlock(bInfo);
+      actor.notifyNamenodeDeletedBlock(bInfo, storageUuid);
     }
   }
   
-  void notifyNamenodeReceivingBlock(ExtendedBlock block) {
+  void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
     checkBlock(block);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
        block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null);
     
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeBlockImmediately(bInfo);
+      actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
     }
   }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Wed Oct 23 01:28:48 2013
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -94,9 +93,9 @@ class BPServiceActor implements Runnable
    * keyed by block ID, contains the pending changes which have yet to be
    * reported to the NN. Access should be synchronized on this object.
    */
-  private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR 
-    = Maps.newHashMap();
-  
+  private final Map<String, PerStoragePendingIncrementalBR>
+      pendingIncrementalBRperStorage = Maps.newConcurrentMap();
+
   private volatile int pendingReceivedRequests = 0;
   private volatile boolean shouldServiceRun = true;
   private final DataNode dn;
@@ -263,64 +262,84 @@ class BPServiceActor implements Runnable
    * @throws IOException
    */
   private void reportReceivedDeletedBlocks() throws IOException {
-
-    // check if there are newly received blocks
-    ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
-    synchronized (pendingIncrementalBR) {
-      int numBlocks = pendingIncrementalBR.size();
-      if (numBlocks > 0) {
-        //
-        // Send newly-received and deleted blockids to namenode
-        //
-        receivedAndDeletedBlockArray = pendingIncrementalBR
-            .values().toArray(new ReceivedDeletedBlockInfo[numBlocks]);
+    // For each storage, check if there are newly received blocks and if
+    // so then send an incremental report to the NameNode.
+    for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
+        pendingIncrementalBRperStorage.entrySet()) {
+      final String storageUuid = entry.getKey();
+      final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
+      ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
+      // TODO: We can probably use finer-grained synchronization now.
+      synchronized (pendingIncrementalBRperStorage) {
+        if (perStorageMap.getBlockInfoCount() > 0) {
+          // Send newly-received and deleted blockids to namenode
+          receivedAndDeletedBlockArray = perStorageMap.dequeueBlockInfos();
+          pendingReceivedRequests -= receivedAndDeletedBlockArray.length;
+        }
       }
-      pendingIncrementalBR.clear();
-    }
-    if (receivedAndDeletedBlockArray != null) {
-      StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
-          bpRegistration.getDatanodeUuid(), receivedAndDeletedBlockArray) };
-      boolean success = false;
-      try {
-        bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
-            report);
-        success = true;
-      } finally {
-        synchronized (pendingIncrementalBR) {
-          if (!success) {
-            // If we didn't succeed in sending the report, put all of the
-            // blocks back onto our queue, but only in the case where we didn't
-            // put something newer in the meantime.
-            for (ReceivedDeletedBlockInfo rdbi : receivedAndDeletedBlockArray) {
-              if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
-                pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
-              }
+
+      if (receivedAndDeletedBlockArray != null) {
+        StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
+            storageUuid, receivedAndDeletedBlockArray) };
+        boolean success = false;
+        try {
+          bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
+              report);
+          success = true;
+        } finally {
+          synchronized (pendingIncrementalBRperStorage) {
+            if (!success) {
+              // If we didn't succeed in sending the report, put all of the
+              // blocks back onto our queue, but only in the case where we
+              // didn't put something newer in the meantime.
+              perStorageMap.putMissingBlockInfos(receivedAndDeletedBlockArray);
+              pendingReceivedRequests += perStorageMap.getBlockInfoCount();
             }
           }
-          pendingReceivedRequests = pendingIncrementalBR.size();
         }
       }
     }
   }
 
+  /**
+   * Retrieve the incremental BR state for a given storage UUID
+   * @param storageUuid
+   * @return
+   */
+  private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
+      String storageUuid) {
+    PerStoragePendingIncrementalBR mapForStorage =
+        pendingIncrementalBRperStorage.get(storageUuid);
+
+    if (mapForStorage == null) {
+      // This is the first time we are adding incremental BR state for
+      // this storage so create a new map. This is required once per
+      // storage, per service actor.
+      mapForStorage = new PerStoragePendingIncrementalBR();
+      pendingIncrementalBRperStorage.put(storageUuid, mapForStorage);
+    }
+
+    return mapForStorage;
+  }
+
   /*
    * Informing the name node could take a long long time! Should we wait
    * till namenode is informed before responding with success to the
    * client? For now we don't.
    */
-  void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) {
-    synchronized (pendingIncrementalBR) {
-      pendingIncrementalBR.put(
-          bInfo.getBlock().getBlockId(), bInfo);
+  void notifyNamenodeBlockImmediately(
+      ReceivedDeletedBlockInfo bInfo, String storageUuid) {
+    synchronized (pendingIncrementalBRperStorage) {
+      getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
       pendingReceivedRequests++;
-      pendingIncrementalBR.notifyAll();
+      pendingIncrementalBRperStorage.notifyAll();
     }
   }
 
-  void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) {
-    synchronized (pendingIncrementalBR) {
-      pendingIncrementalBR.put(
-          bInfo.getBlock().getBlockId(), bInfo);
+  void notifyNamenodeDeletedBlock(
+      ReceivedDeletedBlockInfo bInfo, String storageUuid) {
+    synchronized (pendingIncrementalBRperStorage) {
+      getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
     }
   }
 
@@ -329,13 +348,13 @@ class BPServiceActor implements Runnable
    */
   @VisibleForTesting
   void triggerBlockReportForTests() {
-    synchronized (pendingIncrementalBR) {
+    synchronized (pendingIncrementalBRperStorage) {
       lastBlockReport = 0;
       lastHeartbeat = 0;
-      pendingIncrementalBR.notifyAll();
+      pendingIncrementalBRperStorage.notifyAll();
       while (lastBlockReport == 0) {
         try {
-          pendingIncrementalBR.wait(100);
+          pendingIncrementalBRperStorage.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -345,12 +364,12 @@ class BPServiceActor implements Runnable
   
   @VisibleForTesting
   void triggerHeartbeatForTests() {
-    synchronized (pendingIncrementalBR) {
+    synchronized (pendingIncrementalBRperStorage) {
       lastHeartbeat = 0;
-      pendingIncrementalBR.notifyAll();
+      pendingIncrementalBRperStorage.notifyAll();
       while (lastHeartbeat == 0) {
         try {
-          pendingIncrementalBR.wait(100);
+          pendingIncrementalBRperStorage.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -360,13 +379,13 @@ class BPServiceActor implements Runnable
 
   @VisibleForTesting
   void triggerDeletionReportForTests() {
-    synchronized (pendingIncrementalBR) {
+    synchronized (pendingIncrementalBRperStorage) {
       lastDeletedReport = 0;
-      pendingIncrementalBR.notifyAll();
+      pendingIncrementalBRperStorage.notifyAll();
 
       while (lastDeletedReport == 0) {
         try {
-          pendingIncrementalBR.wait(100);
+          pendingIncrementalBRperStorage.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -582,10 +601,10 @@ class BPServiceActor implements Runnable
         //
         long waitTime = dnConf.heartBeatInterval - 
         (Time.now() - lastHeartbeat);
-        synchronized(pendingIncrementalBR) {
+        synchronized(pendingIncrementalBRperStorage) {
           if (waitTime > 0 && pendingReceivedRequests == 0) {
             try {
-              pendingIncrementalBR.wait(waitTime);
+              pendingIncrementalBRperStorage.wait(waitTime);
             } catch (InterruptedException ie) {
               LOG.warn("BPOfferService for " + this + " interrupted");
             }
@@ -756,4 +775,52 @@ class BPServiceActor implements Runnable
     }
   }
 
+  private static class PerStoragePendingIncrementalBR {
+    private Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR =
+        Maps.newHashMap();
+
+    /**
+     * Return the number of blocks on this storage that have pending
+     * incremental block reports.
+     * @return
+     */
+    int getBlockInfoCount() {
+      return pendingIncrementalBR.size();
+    }
+
+    /**
+     * Dequeue and return all pending incremental block report state.
+     * @return
+     */
+    ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
+      ReceivedDeletedBlockInfo[] blockInfos =
+          pendingIncrementalBR.values().toArray(
+              new ReceivedDeletedBlockInfo[getBlockInfoCount()]);
+
+      pendingIncrementalBR.clear();
+      return blockInfos;
+    }
+
+    /**
+     * Add blocks from blockArray to pendingIncrementalBR, unless the
+     * block already exists in pendingIncrementalBR.
+     * @param blockArray list of blocks to add.
+     */
+    void putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) {
+      for (ReceivedDeletedBlockInfo rdbi : blockArray) {
+        if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
+          pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
+        }
+      }
+    }
+
+    /**
+     * Add pending incremental block report for a single block.
+     * @param blockID
+     * @param blockInfo
+     */
+    void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
+      pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Oct 23 01:28:48 2013
@@ -162,7 +162,8 @@ class BlockReceiver implements Closeable
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
           replicaInfo = datanode.data.createRbw(block);
-          datanode.notifyNamenodeReceivingBlock(block);
+          datanode.notifyNamenodeReceivingBlock(
+              block, replicaInfo.getStorageUuid());
           break;
         case PIPELINE_SETUP_STREAMING_RECOVERY:
           replicaInfo = datanode.data.recoverRbw(
@@ -176,7 +177,8 @@ class BlockReceiver implements Closeable
                 block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
-          datanode.notifyNamenodeReceivingBlock(block);
+          datanode.notifyNamenodeReceivingBlock(
+              block, replicaInfo.getStorageUuid());
           break;
         case PIPELINE_SETUP_APPEND_RECOVERY:
           replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
@@ -185,7 +187,8 @@ class BlockReceiver implements Closeable
                 block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
-          datanode.notifyNamenodeReceivingBlock(block);
+          datanode.notifyNamenodeReceivingBlock(
+              block, replicaInfo.getStorageUuid());
           break;
         case TRANSFER_RBW:
         case TRANSFER_FINALIZED:
@@ -252,6 +255,10 @@ class BlockReceiver implements Closeable
   /** Return the datanode object. */
   DataNode getDataNode() {return datanode;}
 
+  public Replica getReplicaInfo() {
+    return replicaInfo;
+  }
+
   /**
    * close files.
    */
@@ -1072,7 +1079,8 @@ class BlockReceiver implements Closeable
           : 0;
       block.setNumBytes(replicaInfo.getNumBytes());
       datanode.data.finalizeBlock(block);
-      datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+      datanode.closeBlock(
+          block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
       if (ClientTraceLog.isInfoEnabled() && isClient) {
         long offset = 0;
         DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Oct 23 01:28:48 2013
@@ -520,10 +520,11 @@ public class DataNode extends Configured
   }
   
   // calls specific to BP
-  protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+  protected void notifyNamenodeReceivedBlock(
+      ExtendedBlock block, String delHint, String storageUuid) {
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if(bpos != null) {
-      bpos.notifyNamenodeReceivedBlock(block, delHint); 
+      bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
     } else {
       LOG.error("Cannot find BPOfferService for reporting block received for bpid="
           + block.getBlockPoolId());
@@ -531,10 +532,11 @@ public class DataNode extends Configured
   }
   
   // calls specific to BP
-  protected void notifyNamenodeReceivingBlock(ExtendedBlock block) {
+  protected void notifyNamenodeReceivingBlock(
+      ExtendedBlock block, String storageUuid) {
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if(bpos != null) {
-      bpos.notifyNamenodeReceivingBlock(block); 
+      bpos.notifyNamenodeReceivingBlock(block, storageUuid);
     } else {
       LOG.error("Cannot find BPOfferService for reporting block receiving for bpid="
           + block.getBlockPoolId());
@@ -542,10 +544,10 @@ public class DataNode extends Configured
   }
   
   /** Notify the corresponding namenode to delete the block. */
-  public void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+  public void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if (bpos != null) {
-      bpos.notifyNamenodeDeletedBlock(block);
+      bpos.notifyNamenodeDeletedBlock(block, storageUuid);
     } else {
       LOG.error("Cannot find BPOfferService for reporting block deleted for bpid="
           + block.getBlockPoolId());
@@ -1528,11 +1530,11 @@ public class DataNode extends Configured
    * @param block
    * @param delHint
    */
-  void closeBlock(ExtendedBlock block, String delHint) {
+  void closeBlock(ExtendedBlock block, String delHint, String storageUuid) {
     metrics.incrBlocksWritten();
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if(bpos != null) {
-      bpos.notifyNamenodeReceivedBlock(block, delHint);
+      bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
     } else {
       LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
           + block.getBlockPoolId());
@@ -1892,7 +1894,7 @@ public class DataNode extends Configured
     ExtendedBlock newBlock = new ExtendedBlock(oldBlock);
     newBlock.setGenerationStamp(recoveryId);
     newBlock.setNumBytes(newLength);
-    notifyNamenodeReceivedBlock(newBlock, "");
+    notifyNamenodeReceivedBlock(newBlock, "", storageID);
     return storageID;
   }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Oct 23 01:28:48 2013
@@ -447,6 +447,7 @@ class DataXceiver extends Receiver imple
     String mirrorNode = null;           // the name:port of next target
     String firstBadLink = "";           // first datanode that failed in connection setup
     Status mirrorInStatus = SUCCESS;
+    Replica replica;
     try {
       if (isDatanode || 
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
@@ -457,8 +458,10 @@ class DataXceiver extends Receiver imple
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
             cachingStrategy);
+        replica = blockReceiver.getReplicaInfo();
       } else {
-        datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
+        replica =
+            datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
       }
 
       //
@@ -590,7 +593,8 @@ class DataXceiver extends Receiver imple
       // the block is finalized in the PacketResponder.
       if (isDatanode ||
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
-        datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+        datanode.closeBlock(
+            block, DataNode.EMPTY_DEL_HINT, replica.getStorageUuid());
         LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
             + localAddress + " of size " + block.getNumBytes());
       }
@@ -859,7 +863,8 @@ class DataXceiver extends Receiver imple
           dataXceiverServer.balanceThrottler, null);
                     
       // notify name node
-      datanode.notifyNamenodeReceivedBlock(block, delHint);
+      datanode.notifyNamenodeReceivedBlock(
+          block, delHint, blockReceiver.getReplicaInfo().getStorageUuid());
 
       LOG.info("Moved " + block + " from " + peer.getRemoteAddressString());
       

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java Wed Oct 23 01:28:48 2013
@@ -54,4 +54,9 @@ public interface Replica {
    * @return the number of bytes that are visible to readers
    */
   public long getVisibleLength();
+
+  /**
+   * Return the storageUuid of the volume that stores this replica.
+   */
+  public String getStorageUuid();
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Wed Oct 23 01:28:48 2013
@@ -137,6 +137,14 @@ abstract public class ReplicaInfo extend
   void setVolume(FsVolumeSpi vol) {
     this.volume = vol;
   }
+
+  /**
+   * Get the storageUuid of the volume that stores this replica.
+   */
+  @Override
+  public String getStorageUuid() {
+    return volume.getStorageID();
+  }
   
   /**
    * Return the parent directory path where this replica is located

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Wed Oct 23 01:28:48 2013
@@ -243,7 +243,7 @@ public interface FsDatasetSpi<V extends 
    * @param expectedBlockLen the number of bytes the replica is expected to have
    * @throws IOException
    */
-  public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
+  public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
       ) throws IOException;
   
   /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java Wed Oct 23 01:28:48 2013
@@ -26,6 +26,9 @@ import org.apache.hadoop.hdfs.StorageTyp
  * This is an interface for the underlying volume.
  */
 public interface FsVolumeSpi {
+  /** @return the StorageUuid of the volume */
+  public String getStorageID();
+
   /** @return a list of block pools. */
   public String[] getBlockPoolList();
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Wed Oct 23 01:28:48 2013
@@ -195,7 +195,7 @@ class FsDatasetAsyncDiskService {
             + " at file " + blockFile + ". Ignored.");
       } else {
         if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
-          datanode.notifyNamenodeDeletedBlock(block);
+          datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
         }
         volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
         LOG.info("Deleted " + block.getBlockPoolId() + " "

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Wed Oct 23 01:28:48 2013
@@ -699,7 +699,7 @@ class FsDatasetImpl implements FsDataset
   }
 
   @Override // FsDatasetSpi
-  public void recoverClose(ExtendedBlock b, long newGS,
+  public Replica recoverClose(ExtendedBlock b, long newGS,
       long expectedBlockLen) throws IOException {
     LOG.info("Recover failed close " + b);
     // check replica's state
@@ -710,6 +710,7 @@ class FsDatasetImpl implements FsDataset
     if (replicaInfo.getState() == ReplicaState.RBW) {
       finalizeReplica(b.getBlockPoolId(), replicaInfo);
     }
+    return replicaInfo;
   }
   
   /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Wed Oct 23 01:28:48 2013
@@ -290,6 +290,7 @@ class FsVolumeImpl implements FsVolumeSp
     }
   }
 
+  @Override
   public String getStorageID() {
     return storageID;
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Wed Oct 23 01:28:48 2013
@@ -132,6 +132,11 @@ public class SimulatedFSDataset implemen
     }
 
     @Override
+    public String getStorageUuid() {
+      return storage.getStorageUuid();
+    }
+
+    @Override
     synchronized public long getGenerationStamp() {
       return theBlock.getGenerationStamp();
     }
@@ -314,6 +319,8 @@ public class SimulatedFSDataset implemen
   private static class SimulatedStorage {
     private Map<String, SimulatedBPStorage> map = 
       new HashMap<String, SimulatedBPStorage>();
+    private final String storageUuid = "SimulatedStorage-UUID";
+
     private long capacity;  // in bytes
     
     synchronized long getFree() {
@@ -375,6 +382,10 @@ public class SimulatedFSDataset implemen
       }
       return bpStorage;
     }
+
+    public String getStorageUuid() {
+      return storageUuid;
+    }
   }
   
   private final Map<String, Map<Block, BInfo>> blockMap
@@ -625,7 +636,7 @@ public class SimulatedFSDataset implemen
   }
 
   @Override // FsDatasetSpi
-  public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
+  public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
       throws IOException {
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = map.get(b.getLocalBlock());
@@ -639,6 +650,7 @@ public class SimulatedFSDataset implemen
     map.remove(b.getLocalBlock());
     binfo.theBlock.setGenerationStamp(newGS);
     map.put(binfo.theBlock, binfo);
+    return binfo;
   }
   
   @Override // FsDatasetSpi

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java Wed Oct 23 01:28:48 2013
@@ -176,7 +176,7 @@ public class TestBPOfferService {
       waitForBlockReport(mockNN2);
 
       // When we receive a block, it should report it to both NNs
-      bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "");
+      bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "", "");
 
       ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1);
       assertEquals(1, ret.length);

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java Wed Oct 23 01:28:48 2013
@@ -412,6 +412,11 @@ public class TestDirectoryScanner {
     public StorageType getStorageType() {
       return StorageType.DEFAULT;
     }
+
+    @Override
+    public String getStorageID() {
+      return "";
+    }
   }
 
   private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();