You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2017/05/05 19:05:18 UTC

[2/2] hadoop git commit: HDFS-9807. Add an optional StorageID to writes. Contributed by Ewan Higgs

HDFS-9807. Add an optional StorageID to writes. Contributed by Ewan Higgs


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a3954cca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a3954cca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a3954cca

Branch: refs/heads/trunk
Commit: a3954ccab148bddc290cb96528e63ff19799bcc9
Parents: 4e6bbd0
Author: Chris Douglas <cd...@apache.org>
Authored: Fri May 5 12:01:26 2017 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri May 5 12:01:26 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DataStreamer.java    |  35 +-
 .../apache/hadoop/hdfs/StripedDataStreamer.java |  10 +-
 .../datatransfer/DataTransferProtocol.java      |  19 +-
 .../hdfs/protocol/datatransfer/Sender.java      |  29 +-
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  13 +
 .../token/block/BlockTokenIdentifier.java       |  36 +-
 .../src/main/proto/datatransfer.proto           |   4 +
 .../src/main/proto/hdfs.proto                   |   1 +
 .../hdfs/protocol/datatransfer/Receiver.java    |  20 +-
 .../block/BlockPoolTokenSecretManager.java      |  22 +-
 .../token/block/BlockTokenSecretManager.java    |  55 ++--
 .../hadoop/hdfs/server/balancer/Dispatcher.java |   5 +-
 .../hadoop/hdfs/server/balancer/KeyManager.java |   4 +-
 .../server/blockmanagement/BlockManager.java    |   6 +-
 .../hdfs/server/datanode/BPOfferService.java    |   3 +-
 .../hdfs/server/datanode/BlockReceiver.java     |  12 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  42 ++-
 .../hdfs/server/datanode/DataXceiver.java       |  68 ++--
 .../erasurecode/ErasureCodingWorker.java        |   3 +-
 .../erasurecode/StripedBlockReader.java         |   2 +-
 .../erasurecode/StripedBlockWriter.java         |  10 +-
 .../erasurecode/StripedReconstructionInfo.java  |  16 +-
 .../datanode/erasurecode/StripedWriter.java     |   5 +-
 .../AvailableSpaceVolumeChoosingPolicy.java     |  20 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java |   6 +-
 .../RoundRobinVolumeChoosingPolicy.java         |   2 +-
 .../fsdataset/VolumeChoosingPolicy.java         |   5 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  21 +-
 .../datanode/fsdataset/impl/FsVolumeList.java   |  19 +-
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |   3 +-
 .../hadoop/hdfs/TestBlockStoragePolicy.java     |  75 ++++-
 .../hadoop/hdfs/TestDataTransferProtocol.java   |   3 +-
 .../hdfs/TestWriteBlockGetsBlockLengthHint.java |   6 +-
 .../security/token/block/TestBlockToken.java    |  98 +++---
 .../server/datanode/BlockReportTestBase.java    |   2 +-
 .../server/datanode/SimulatedFSDataset.java     |  19 +-
 .../hdfs/server/datanode/TestBlockRecovery.java |   6 +-
 .../server/datanode/TestBlockReplacement.java   |   2 +-
 .../TestDataXceiverLazyPersistHint.java         |   4 +-
 .../hdfs/server/datanode/TestDiskError.java     |   5 +-
 .../server/datanode/TestSimulatedFSDataset.java |   4 +-
 .../extdataset/ExternalDatasetImpl.java         |  10 +-
 .../TestAvailableSpaceVolumeChoosingPolicy.java |  76 +++--
 .../TestRoundRobinVolumeChoosingPolicy.java     |  29 +-
 .../fsdataset/impl/TestFsDatasetImpl.java       |   4 +-
 .../fsdataset/impl/TestFsVolumeList.java        |   2 +-
 .../fsdataset/impl/TestWriteToReplica.java      |  29 +-
 .../namenode/TestNamenodeStorageDirectives.java | 330 +++++++++++++++++++
 48 files changed, 903 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 0268537..49c17b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -174,10 +174,12 @@ class DataStreamer extends Daemon {
 
     void sendTransferBlock(final DatanodeInfo[] targets,
         final StorageType[] targetStorageTypes,
+        final String[] targetStorageIDs,
         final Token<BlockTokenIdentifier> blockToken) throws IOException {
       //send the TRANSFER_BLOCK request
       new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
-          dfsClient.clientName, targets, targetStorageTypes);
+          dfsClient.clientName, targets, targetStorageTypes,
+          targetStorageIDs);
       out.flush();
       //ack
       BlockOpResponseProto transferResponse = BlockOpResponseProto
@@ -1367,9 +1369,11 @@ class DataStreamer extends Daemon {
       final DatanodeInfo src = original[tried % original.length];
       final DatanodeInfo[] targets = {nodes[d]};
       final StorageType[] targetStorageTypes = {storageTypes[d]};
+      final String[] targetStorageIDs = {storageIDs[d]};
 
       try {
-        transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+        transfer(src, targets, targetStorageTypes, targetStorageIDs,
+            lb.getBlockToken());
       } catch (IOException ioe) {
         DFSClient.LOG.warn("Error transferring data from " + src + " to " +
             nodes[d] + ": " + ioe.getMessage());
@@ -1400,6 +1404,7 @@ class DataStreamer extends Daemon {
 
   private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
                         final StorageType[] targetStorageTypes,
+                        final String[] targetStorageIDs,
                         final Token<BlockTokenIdentifier> blockToken)
       throws IOException {
     //transfer replica to the new datanode
@@ -1412,7 +1417,8 @@ class DataStreamer extends Daemon {
 
         streams = new StreamerStreams(src, writeTimeout, readTimeout,
             blockToken);
-        streams.sendTransferBlock(targets, targetStorageTypes, blockToken);
+        streams.sendTransferBlock(targets, targetStorageTypes,
+            targetStorageIDs, blockToken);
         return;
       } catch (InvalidEncryptionKeyException e) {
         policy.recordFailure(e);
@@ -1440,11 +1446,12 @@ class DataStreamer extends Daemon {
       streamerClosed = true;
       return;
     }
-    setupPipelineInternal(nodes, storageTypes);
+    setupPipelineInternal(nodes, storageTypes, storageIDs);
   }
 
   protected void setupPipelineInternal(DatanodeInfo[] datanodes,
-      StorageType[] nodeStorageTypes) throws IOException {
+      StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
+      throws IOException {
     boolean success = false;
     long newGS = 0L;
     while (!success && !streamerClosed && dfsClient.clientRunning) {
@@ -1465,7 +1472,8 @@ class DataStreamer extends Daemon {
       accessToken = lb.getBlockToken();
 
       // set up the pipeline again with the remaining nodes
-      success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
+      success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS,
+          isRecovery);
 
       failPacket4Testing();
 
@@ -1601,7 +1609,8 @@ class DataStreamer extends Daemon {
   protected LocatedBlock nextBlockOutputStream() throws IOException {
     LocatedBlock lb;
     DatanodeInfo[] nodes;
-    StorageType[] storageTypes;
+    StorageType[] nextStorageTypes;
+    String[] nextStorageIDs;
     int count = dfsClient.getConf().getNumBlockWriteRetry();
     boolean success;
     final ExtendedBlock oldBlock = block.getCurrentBlock();
@@ -1617,10 +1626,12 @@ class DataStreamer extends Daemon {
       bytesSent = 0;
       accessToken = lb.getBlockToken();
       nodes = lb.getLocations();
-      storageTypes = lb.getStorageTypes();
+      nextStorageTypes = lb.getStorageTypes();
+      nextStorageIDs = lb.getStorageIDs();
 
       // Connect to first DataNode in the list.
-      success = createBlockOutputStream(nodes, storageTypes, 0L, false);
+      success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
+          0L, false);
 
       if (!success) {
         LOG.warn("Abandoning " + block);
@@ -1643,7 +1654,8 @@ class DataStreamer extends Daemon {
   // Returns true if success, otherwise return failure.
   //
   boolean createBlockOutputStream(DatanodeInfo[] nodes,
-      StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
+      StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
+      long newGS, boolean recoveryFlag) {
     if (nodes.length == 0) {
       LOG.info("nodes are empty for write pipeline of " + block);
       return false;
@@ -1696,7 +1708,8 @@ class DataStreamer extends Daemon {
             dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
             nodes.length, block.getNumBytes(), bytesSent, newGS,
             checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
-            (targetPinnings != null && targetPinnings[0]), targetPinnings);
+            (targetPinnings != null && targetPinnings[0]), targetPinnings,
+            nodeStorageIDs[0], nodeStorageIDs);
 
         // receive ack for connect
         BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index b457edb..d920f18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -100,9 +100,11 @@ public class StripedDataStreamer extends DataStreamer {
 
     DatanodeInfo[] nodes = lb.getLocations();
     StorageType[] storageTypes = lb.getStorageTypes();
+    String[] storageIDs = lb.getStorageIDs();
 
     // Connect to the DataNode. If fail the internal error state will be set.
-    success = createBlockOutputStream(nodes, storageTypes, 0L, false);
+    success = createBlockOutputStream(nodes, storageTypes, storageIDs, 0L,
+        false);
 
     if (!success) {
       block.setCurrentBlock(null);
@@ -121,7 +123,8 @@ public class StripedDataStreamer extends DataStreamer {
 
   @Override
   protected void setupPipelineInternal(DatanodeInfo[] nodes,
-      StorageType[] nodeStorageTypes) throws IOException {
+      StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
+      throws IOException {
     boolean success = false;
     while (!success && !streamerClosed() && dfsClient.clientRunning) {
       if (!handleRestartingDatanode()) {
@@ -141,7 +144,8 @@ public class StripedDataStreamer extends DataStreamer {
       // set up the pipeline again with the remaining nodes. when a striped
       // data streamer comes here, it must be in external error state.
       assert getErrorState().hasExternalError();
-      success = createBlockOutputStream(nodes, nodeStorageTypes, newGS, true);
+      success = createBlockOutputStream(nodes, nodeStorageTypes,
+          nodeStorageIDs, newGS, true);
 
       failPacket4Testing();
       getErrorState().checkRestartingNodeDeadline(nodes);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index 6c5883c..fe20c37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -101,6 +101,11 @@ public interface DataTransferProtocol {
    *                         written to disk lazily
    * @param pinning whether to pin the block, so Balancer won't move it.
    * @param targetPinnings whether to pin the block on target datanode
+   * @param storageID optional StorageIDs designating where to write the
+   *                  block. An empty String or null indicates that this
+   *                  has not been provided.
+   * @param targetStorageIDs target StorageIDs corresponding to the target
+   *                         datanodes.
    */
   void writeBlock(final ExtendedBlock blk,
       final StorageType storageType,
@@ -118,7 +123,9 @@ public interface DataTransferProtocol {
       final CachingStrategy cachingStrategy,
       final boolean allowLazyPersist,
       final boolean pinning,
-      final boolean[] targetPinnings) throws IOException;
+      final boolean[] targetPinnings,
+      final String storageID,
+      final String[] targetStorageIDs) throws IOException;
   /**
    * Transfer a block to another datanode.
    * The block stage must be
@@ -129,12 +136,15 @@ public interface DataTransferProtocol {
    * @param blockToken security token for accessing the block.
    * @param clientName client's name.
    * @param targets target datanodes.
+   * @param targetStorageIDs StorageID designating where to write the
+   *                     block.
    */
   void transferBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes) throws IOException;
+      final StorageType[] targetStorageTypes,
+      final String[] targetStorageIDs) throws IOException;
 
   /**
    * Request short circuit access file descriptors from a DataNode.
@@ -179,12 +189,15 @@ public interface DataTransferProtocol {
    * @param blockToken security token for accessing the block.
    * @param delHint the hint for deleting the block in the original datanode.
    * @param source the source datanode for receiving the block.
+   * @param storageId an optional storage ID to designate where the block is
+   *                  replaced to.
    */
   void replaceBlock(final ExtendedBlock blk,
       final StorageType storageType,
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,
-      final DatanodeInfo source) throws IOException;
+      final DatanodeInfo source,
+      final String storageId) throws IOException;
 
   /**
    * Copy a block.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index e133975..8a8d20d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -132,7 +133,9 @@ public class Sender implements DataTransferProtocol {
       final CachingStrategy cachingStrategy,
       final boolean allowLazyPersist,
       final boolean pinning,
-      final boolean[] targetPinnings) throws IOException {
+      final boolean[] targetPinnings,
+      final String storageId,
+      final String[] targetStorageIds) throws IOException {
     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
         blk, clientName, blockToken);
 
@@ -154,11 +157,14 @@ public class Sender implements DataTransferProtocol {
         .setCachingStrategy(getCachingStrategy(cachingStrategy))
         .setAllowLazyPersist(allowLazyPersist)
         .setPinning(pinning)
-        .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1));
-
+        .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1))
+        .addAllTargetStorageIds(PBHelperClient.convert(targetStorageIds, 1));
     if (source != null) {
       proto.setSource(PBHelperClient.convertDatanodeInfo(source));
     }
+    if (storageId != null) {
+      proto.setStorageId(storageId);
+    }
 
     send(out, Op.WRITE_BLOCK, proto.build());
   }
@@ -168,7 +174,8 @@ public class Sender implements DataTransferProtocol {
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes) throws IOException {
+      final StorageType[] targetStorageTypes,
+      final String[] targetStorageIds) throws IOException {
 
     OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
         .setHeader(DataTransferProtoUtil.buildClientHeader(
@@ -176,6 +183,7 @@ public class Sender implements DataTransferProtocol {
         .addAllTargets(PBHelperClient.convert(targets))
         .addAllTargetStorageTypes(
             PBHelperClient.convertStorageTypes(targetStorageTypes))
+        .addAllTargetStorageIds(Arrays.asList(targetStorageIds))
         .build();
 
     send(out, Op.TRANSFER_BLOCK, proto);
@@ -233,15 +241,18 @@ public class Sender implements DataTransferProtocol {
       final StorageType storageType,
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,
-      final DatanodeInfo source) throws IOException {
-    OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
+      final DatanodeInfo source,
+      final String storageId) throws IOException {
+    OpReplaceBlockProto.Builder proto = OpReplaceBlockProto.newBuilder()
         .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
         .setStorageType(PBHelperClient.convertStorageType(storageType))
         .setDelHint(delHint)
-        .setSource(PBHelperClient.convertDatanodeInfo(source))
-        .build();
+        .setSource(PBHelperClient.convertDatanodeInfo(source));
+    if (storageId != null) {
+      proto.setStorageId(storageId);
+    }
 
-    send(out, Op.REPLACE_BLOCK, proto);
+    send(out, Op.REPLACE_BLOCK, proto.build());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 2b8f102..614f653 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -345,6 +345,16 @@ public class PBHelperClient {
     return pinnings;
   }
 
+  public static List<String> convert(String[] targetIds, int idx) {
+    List<String> ids = new ArrayList<>();
+    if (targetIds != null) {
+      for (; idx < targetIds.length; ++idx) {
+        ids.add(targetIds[idx]);
+      }
+    }
+    return ids;
+  }
+
   public static ExtendedBlock convert(ExtendedBlockProto eb) {
     if (eb == null) return null;
     return new ExtendedBlock( eb.getPoolId(), eb.getBlockId(), eb.getNumBytes(),
@@ -640,6 +650,9 @@ public class PBHelperClient {
     for (StorageType storageType : blockTokenSecret.getStorageTypes()) {
       builder.addStorageTypes(convertStorageType(storageType));
     }
+    for (String storageId : blockTokenSecret.getStorageIds()) {
+      builder.addStorageIds(storageId);
+    }
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
index 228a7b6..5950752 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
@@ -53,16 +53,19 @@ public class BlockTokenIdentifier extends TokenIdentifier {
   private long blockId;
   private final EnumSet<AccessMode> modes;
   private StorageType[] storageTypes;
+  private String[] storageIds;
   private boolean useProto;
 
   private byte [] cache;
 
   public BlockTokenIdentifier() {
-    this(null, null, 0, EnumSet.noneOf(AccessMode.class), null, false);
+    this(null, null, 0, EnumSet.noneOf(AccessMode.class), null, null,
+        false);
   }
 
   public BlockTokenIdentifier(String userId, String bpid, long blockId,
-      EnumSet<AccessMode> modes, StorageType[] storageTypes, boolean useProto) {
+      EnumSet<AccessMode> modes, StorageType[] storageTypes,
+      String[] storageIds, boolean useProto) {
     this.cache = null;
     this.userId = userId;
     this.blockPoolId = bpid;
@@ -70,6 +73,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
     this.storageTypes = Optional.ofNullable(storageTypes)
                                 .orElse(StorageType.EMPTY_ARRAY);
+    this.storageIds = Optional.ofNullable(storageIds)
+                              .orElse(new String[0]);
     this.useProto = useProto;
   }
 
@@ -125,6 +130,10 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     return storageTypes;
   }
 
+  public String[] getStorageIds(){
+    return storageIds;
+  }
+
   @Override
   public String toString() {
     return "block_token_identifier (expiryDate=" + this.getExpiryDate()
@@ -132,7 +141,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
         + ", blockPoolId=" + this.getBlockPoolId()
         + ", blockId=" + this.getBlockId() + ", access modes="
         + this.getAccessModes() + ", storageTypes= "
-        + Arrays.toString(this.getStorageTypes()) + ")";
+        + Arrays.toString(this.getStorageTypes()) + ", storageIds= "
+        + Arrays.toString(this.getStorageIds()) + ")";
   }
 
   static boolean isEqual(Object a, Object b) {
@@ -151,7 +161,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
           && isEqual(this.blockPoolId, that.blockPoolId)
           && this.blockId == that.blockId
           && isEqual(this.modes, that.modes)
-          && Arrays.equals(this.storageTypes, that.storageTypes);
+          && Arrays.equals(this.storageTypes, that.storageTypes)
+          && Arrays.equals(this.storageIds, that.storageIds);
     }
     return false;
   }
@@ -161,7 +172,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
         ^ (userId == null ? 0 : userId.hashCode())
         ^ (blockPoolId == null ? 0 : blockPoolId.hashCode())
-        ^ (storageTypes == null ? 0 : Arrays.hashCode(storageTypes));
+        ^ (storageTypes == null ? 0 : Arrays.hashCode(storageTypes))
+        ^ (storageIds == null ? 0 : Arrays.hashCode(storageIds));
   }
 
   /**
@@ -220,6 +232,14 @@ public class BlockTokenIdentifier extends TokenIdentifier {
       readStorageTypes[i] = WritableUtils.readEnum(in, StorageType.class);
     }
     storageTypes = readStorageTypes;
+
+    length = WritableUtils.readVInt(in);
+    String[] readStorageIds = new String[length];
+    for (int i = 0; i < length; i++) {
+      readStorageIds[i] = WritableUtils.readString(in);
+    }
+    storageIds = readStorageIds;
+
     useProto = false;
   }
 
@@ -248,6 +268,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     storageTypes = blockTokenSecretProto.getStorageTypesList().stream()
         .map(PBHelperClient::convertStorageType)
         .toArray(StorageType[]::new);
+    storageIds = blockTokenSecretProto.getStorageIdsList().stream()
+        .toArray(String[]::new);
     useProto = true;
   }
 
@@ -275,6 +297,10 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     for (StorageType type: storageTypes){
       WritableUtils.writeEnum(out, type);
     }
+    WritableUtils.writeVInt(out, storageIds.length);
+    for (String id: storageIds) {
+      WritableUtils.writeString(out, id);
+    }
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 889361a..2356201 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -125,12 +125,15 @@ message OpWriteBlockProto {
   //whether to pin the block, so Balancer won't move it.
   optional bool pinning = 14 [default = false];
   repeated bool targetPinnings = 15;
+  optional string storageId = 16;
+  repeated string targetStorageIds = 17;
 }
   
 message OpTransferBlockProto {
   required ClientOperationHeaderProto header = 1;
   repeated DatanodeInfoProto targets = 2;
   repeated StorageTypeProto targetStorageTypes = 3;
+  repeated string targetStorageIds = 4;
 }
 
 message OpReplaceBlockProto {
@@ -138,6 +141,7 @@ message OpReplaceBlockProto {
   required string delHint = 2;
   required DatanodeInfoProto source = 3;
   optional StorageTypeProto storageType = 4 [default = DISK];
+  optional string storageId = 5;
 }
 
 message OpCopyBlockProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 3e27427..08ed3c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -570,4 +570,5 @@ message BlockTokenSecretProto {
   optional uint64 blockId = 5;
   repeated AccessModeProto modes = 6;
   repeated StorageTypeProto storageTypes = 7;
+  repeated string storageIds = 8;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index 08ab967..bab2e8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -25,7 +25,9 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
@@ -185,7 +187,9 @@ public abstract class Receiver implements DataTransferProtocol {
             CachingStrategy.newDefaultStrategy()),
           (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
           (proto.hasPinning() ? proto.getPinning(): false),
-          (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())));
+          (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())),
+          proto.getStorageId(),
+          proto.getTargetStorageIdsList().toArray(new String[0]));
     } finally {
      if (traceScope != null) traceScope.close();
     }
@@ -199,11 +203,18 @@ public abstract class Receiver implements DataTransferProtocol {
     TraceScope traceScope = continueTraceSpan(proto.getHeader(),
         proto.getClass().getSimpleName());
     try {
-      transferBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
+      final ExtendedBlock block =
+          PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock());
+      final StorageType[] targetStorageTypes =
+          PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(),
+              targets.length);
+      transferBlock(block,
           PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),
           proto.getHeader().getClientName(),
           targets,
-          PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
+          targetStorageTypes,
+          proto.getTargetStorageIdsList().toArray(new String[0])
+      );
     } finally {
       if (traceScope != null) traceScope.close();
     }
@@ -264,7 +275,8 @@ public abstract class Receiver implements DataTransferProtocol {
           PBHelperClient.convertStorageType(proto.getStorageType()),
           PBHelperClient.convert(proto.getHeader().getToken()),
           proto.getDelHint(),
-          PBHelperClient.convert(proto.getSource()));
+          PBHelperClient.convert(proto.getSource()),
+          proto.getStorageId());
     } finally {
       if (traceScope != null) traceScope.close();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
index 29fb73f..8400b4f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
@@ -84,25 +84,27 @@ public class BlockPoolTokenSecretManager extends
   /**
    * See {@link BlockTokenSecretManager#checkAccess(BlockTokenIdentifier,
    *                String, ExtendedBlock, BlockTokenIdentifier.AccessMode,
-   *                StorageType[])}
+   *                StorageType[], String[])}
    */
   public void checkAccess(BlockTokenIdentifier id, String userId,
       ExtendedBlock block, AccessMode mode,
-      StorageType[] storageTypes) throws InvalidToken {
+      StorageType[] storageTypes, String[] storageIds)
+      throws InvalidToken {
     get(block.getBlockPoolId()).checkAccess(id, userId, block, mode,
-        storageTypes);
+        storageTypes, storageIds);
   }
 
   /**
    * See {@link BlockTokenSecretManager#checkAccess(Token, String,
    *                ExtendedBlock, BlockTokenIdentifier.AccessMode,
-   *                StorageType[])}.
+   *                StorageType[], String[])}
    */
   public void checkAccess(Token<BlockTokenIdentifier> token,
       String userId, ExtendedBlock block, AccessMode mode,
-      StorageType[] storageTypes) throws InvalidToken {
+      StorageType[] storageTypes, String[] storageIds)
+      throws InvalidToken {
     get(block.getBlockPoolId()).checkAccess(token, userId, block, mode,
-        storageTypes);
+        storageTypes, storageIds);
   }
 
   /**
@@ -115,11 +117,13 @@ public class BlockPoolTokenSecretManager extends
 
   /**
    * See {@link BlockTokenSecretManager#generateToken(ExtendedBlock, EnumSet,
-   *  StorageType[])}
+   *  StorageType[], String[])}.
    */
   public Token<BlockTokenIdentifier> generateToken(ExtendedBlock b,
-      EnumSet<AccessMode> of, StorageType[] storageTypes) throws IOException {
-    return get(b.getBlockPoolId()).generateToken(b, of, storageTypes);
+      EnumSet<AccessMode> of, StorageType[] storageTypes, String[] storageIds)
+      throws IOException {
+    return get(b.getBlockPoolId()).generateToken(b, of, storageTypes,
+        storageIds);
   }
   
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index f3bec83..6b54490 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -247,18 +247,19 @@ public class BlockTokenSecretManager extends
   /** Generate an block token for current user */
   public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
       EnumSet<BlockTokenIdentifier.AccessMode> modes,
-      StorageType[] storageTypes) throws IOException {
+      StorageType[] storageTypes, String[] storageIds) throws IOException {
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     String userID = (ugi == null ? null : ugi.getShortUserName());
-    return generateToken(userID, block, modes, storageTypes);
+    return generateToken(userID, block, modes, storageTypes, storageIds);
   }
 
   /** Generate a block token for a specified user */
   public Token<BlockTokenIdentifier> generateToken(String userId,
       ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes,
-      StorageType[] storageTypes) throws IOException {
+      StorageType[] storageTypes, String[] storageIds) throws IOException {
     BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
-        .getBlockPoolId(), block.getBlockId(), modes, storageTypes, useProto);
+        .getBlockPoolId(), block.getBlockId(), modes, storageTypes,
+        storageIds, useProto);
     return new Token<BlockTokenIdentifier>(id, this);
   }
 
@@ -272,10 +273,13 @@ public class BlockTokenSecretManager extends
    */
   public void checkAccess(BlockTokenIdentifier id, String userId,
       ExtendedBlock block, BlockTokenIdentifier.AccessMode mode,
-      StorageType[] storageTypes) throws InvalidToken {
+      StorageType[] storageTypes, String[] storageIds) throws InvalidToken {
     checkAccess(id, userId, block, mode);
     if (storageTypes != null && storageTypes.length > 0) {
-      checkAccess(id.getStorageTypes(), storageTypes);
+      checkAccess(id.getStorageTypes(), storageTypes, "StorageTypes");
+    }
+    if (storageIds != null && storageIds.length > 0) {
+      checkAccess(id.getStorageIds(), storageIds, "StorageIDs");
     }
   }
 
@@ -309,30 +313,31 @@ public class BlockTokenSecretManager extends
   }
 
   /**
-   * Check if the requested StorageTypes match the StorageTypes in the
-   * BlockTokenIdentifier.
-   * Empty candidateStorageTypes specifiers mean 'all is permitted'. They
-   * would otherwise be nonsensical.
+   * Check if the requested values can be satisfied with the values in the
+   * BlockToken. This is intended for use with StorageTypes and StorageIDs.
+   *
+   * The current node can only verify that one of the storage [Type|ID] is
+   * available. The rest will be on different nodes.
    */
-  public static void checkAccess(StorageType[] candidateStorageTypes,
-      StorageType[] storageTypesRequested) throws InvalidToken {
-    if (storageTypesRequested.length == 0) {
-      throw new InvalidToken("The request has no StorageTypes. "
+  public static <T> void checkAccess(T[] candidates, T[] requested, String msg)
+      throws InvalidToken {
+    if (requested.length == 0) {
+      throw new InvalidToken("The request has no " + msg + ". "
           + "This is probably a configuration error.");
     }
-    if (candidateStorageTypes.length == 0) {
+    if (candidates.length == 0) {
       return;
     }
 
-    List<StorageType> unseenCandidates = new ArrayList<StorageType>();
-    unseenCandidates.addAll(Arrays.asList(candidateStorageTypes));
-    for (StorageType storageType : storageTypesRequested) {
-      final int index = unseenCandidates.indexOf(storageType);
+    List unseenCandidates = new ArrayList<T>();
+    unseenCandidates.addAll(Arrays.asList(candidates));
+    for (T req : requested) {
+      final int index = unseenCandidates.indexOf(req);
       if (index == -1) {
-        throw new InvalidToken("Block token with StorageTypes "
-            + Arrays.toString(candidateStorageTypes)
-            + " not valid for access with StorageTypes "
-            + Arrays.toString(storageTypesRequested));
+        throw new InvalidToken("Block token with " + msg + " "
+            + Arrays.toString(candidates)
+            + " not valid for access with " + msg + " "
+            + Arrays.toString(requested));
       }
       Collections.swap(unseenCandidates, index, unseenCandidates.size()-1);
       unseenCandidates.remove(unseenCandidates.size()-1);
@@ -342,7 +347,7 @@ public class BlockTokenSecretManager extends
   /** Check if access should be allowed. userID is not checked if null */
   public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
       ExtendedBlock block, BlockTokenIdentifier.AccessMode mode,
-      StorageType[] storageTypes) throws InvalidToken {
+      StorageType[] storageTypes, String[] storageIds) throws InvalidToken {
     BlockTokenIdentifier id = new BlockTokenIdentifier();
     try {
       id.readFields(new DataInputStream(new ByteArrayInputStream(token
@@ -352,7 +357,7 @@ public class BlockTokenSecretManager extends
           "Unable to de-serialize block token identifier for user=" + userId
               + ", block=" + block + ", access mode=" + mode);
     }
-    checkAccess(id, userId, block, mode, storageTypes);
+    checkAccess(id, userId, block, mode, storageTypes, storageIds);
     if (!Arrays.equals(retrievePassword(id), token.getPassword())) {
       throw new InvalidToken("Block token with " + id.toString()
           + " doesn't have the correct token password");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 91dc907..f855e45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -357,7 +357,7 @@ public class Dispatcher {
             reportedBlock.getBlock());
         final KeyManager km = nnc.getKeyManager(); 
         Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb,
-            new StorageType[]{target.storageType});
+            new StorageType[]{target.storageType}, new String[0]);
         IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
             unbufIn, km, accessToken, target.getDatanodeInfo());
         unbufOut = saslStreams.out;
@@ -411,7 +411,8 @@ public class Dispatcher {
     private void sendRequest(DataOutputStream out, ExtendedBlock eb,
         Token<BlockTokenIdentifier> accessToken) throws IOException {
       new Sender(out).replaceBlock(eb, target.storageType, accessToken,
-          source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
+          source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode,
+          null);
     }
 
     /** Check whether to continue waiting for response */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
index 06bf07f..faf95b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
@@ -95,7 +95,7 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory {
 
   /** Get an access token for a block. */
   public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb,
-      StorageType[] storageTypes) throws IOException {
+      StorageType[] storageTypes, String[] storageIds) throws IOException {
     if (!isBlockTokenEnabled) {
       return BlockTokenSecretManager.DUMMY_TOKEN;
     } else {
@@ -105,7 +105,7 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory {
       }
       return blockTokenSecretManager.generateToken(null, eb,
           EnumSet.of(BlockTokenIdentifier.AccessMode.REPLACE,
-              BlockTokenIdentifier.AccessMode.COPY), storageTypes);
+              BlockTokenIdentifier.AccessMode.COPY), storageTypes, storageIds);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index e63930a..8f58e25 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1283,13 +1283,15 @@ public class BlockManager implements BlockStatsMXBean {
           internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]);
           blockTokens[i] = blockTokenSecretManager.generateToken(
               NameNode.getRemoteUser().getShortUserName(),
-              internalBlock, EnumSet.of(mode), b.getStorageTypes());
+              internalBlock, EnumSet.of(mode), b.getStorageTypes(),
+              b.getStorageIDs());
         }
         sb.setBlockTokens(blockTokens);
       } else {
         b.setBlockToken(blockTokenSecretManager.generateToken(
             NameNode.getRemoteUser().getShortUserName(),
-            b.getBlock(), EnumSet.of(mode), b.getStorageTypes()));
+            b.getBlock(), EnumSet.of(mode), b.getStorageTypes(),
+            b.getStorageIDs()));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index e0daca7..042169a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -679,7 +679,8 @@ class BPOfferService {
     case DatanodeProtocol.DNA_TRANSFER:
       // Send a copy of a block to another datanode
       dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
-          bcmd.getTargets(), bcmd.getTargetStorageTypes());
+          bcmd.getTargets(), bcmd.getTargetStorageTypes(),
+          bcmd.getTargetStorageIDs());
       break;
     case DatanodeProtocol.DNA_INVALIDATE:
       //

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 00109e0..2ab4067 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -151,7 +151,8 @@ class BlockReceiver implements Closeable {
       final DataNode datanode, DataChecksum requestedChecksum,
       CachingStrategy cachingStrategy,
       final boolean allowLazyPersist,
-      final boolean pinning) throws IOException {
+      final boolean pinning,
+      final String storageId) throws IOException {
     try{
       this.block = block;
       this.in = in;
@@ -197,6 +198,7 @@ class BlockReceiver implements Closeable {
             + "\n allowLazyPersist=" + allowLazyPersist + ", pinning=" + pinning
             + ", isClient=" + isClient + ", isDatanode=" + isDatanode
             + ", responseInterval=" + responseInterval
+            + ", storageID=" + (storageId != null ? storageId : "null")
         );
       }
 
@@ -204,11 +206,13 @@ class BlockReceiver implements Closeable {
       // Open local disk out
       //
       if (isDatanode) { //replication or move
-        replicaHandler = datanode.data.createTemporary(storageType, block);
+        replicaHandler =
+            datanode.data.createTemporary(storageType, storageId, block);
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
-          replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist);
+          replicaHandler = datanode.data.createRbw(storageType, storageId,
+              block, allowLazyPersist);
           datanode.notifyNamenodeReceivingBlock(
               block, replicaHandler.getReplica().getStorageUuid());
           break;
@@ -233,7 +237,7 @@ class BlockReceiver implements Closeable {
         case TRANSFER_FINALIZED:
           // this is a transfer destination
           replicaHandler =
-              datanode.data.createTemporary(storageType, block);
+              datanode.data.createTemporary(storageType, storageId, block);
           break;
         default: throw new IOException("Unsupported stage " + stage + 
               " while receiving block " + block + " from " + inAddr);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 66ef89a..2305e0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1943,7 +1943,7 @@ public class DataNode extends ReconfigurableBase
         LOG.debug("Got: " + id.toString());
       }
       blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode,
-          null);
+          null, null);
     }
   }
 
@@ -2224,7 +2224,8 @@ public class DataNode extends ReconfigurableBase
 
   @VisibleForTesting
   void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
-      StorageType[] xferTargetStorageTypes) throws IOException {
+      StorageType[] xferTargetStorageTypes, String[] xferTargetStorageIDs)
+      throws IOException {
     BPOfferService bpos = getBPOSForBlock(block);
     DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
 
@@ -2281,17 +2282,19 @@ public class DataNode extends ReconfigurableBase
       LOG.info(bpReg + " Starting thread to transfer " + 
                block + " to " + xfersBuilder);                       
 
-      new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block,
+      new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes,
+          xferTargetStorageIDs, block,
           BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
     }
   }
 
   void transferBlocks(String poolId, Block blocks[],
-      DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) {
+      DatanodeInfo[][] xferTargets, StorageType[][] xferTargetStorageTypes,
+      String[][] xferTargetStorageIDs) {
     for (int i = 0; i < blocks.length; i++) {
       try {
         transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i],
-            xferTargetStorageTypes[i]);
+            xferTargetStorageTypes[i], xferTargetStorageIDs[i]);
       } catch (IOException ie) {
         LOG.warn("Failed to transfer block " + blocks[i], ie);
       }
@@ -2395,6 +2398,7 @@ public class DataNode extends ReconfigurableBase
   private class DataTransfer implements Runnable {
     final DatanodeInfo[] targets;
     final StorageType[] targetStorageTypes;
+    final private String[] targetStorageIds;
     final ExtendedBlock b;
     final BlockConstructionStage stage;
     final private DatanodeRegistration bpReg;
@@ -2406,8 +2410,8 @@ public class DataNode extends ReconfigurableBase
      * entire target list, the block, and the data.
      */
     DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
-        ExtendedBlock b, BlockConstructionStage stage,
-        final String clientname) {
+        String[] targetStorageIds, ExtendedBlock b,
+        BlockConstructionStage stage, final String clientname) {
       if (DataTransferProtocol.LOG.isDebugEnabled()) {
         DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
             + b + " (numBytes=" + b.getNumBytes() + ")"
@@ -2415,10 +2419,13 @@ public class DataNode extends ReconfigurableBase
             + ", clientname=" + clientname
             + ", targets=" + Arrays.asList(targets)
             + ", target storage types=" + (targetStorageTypes == null ? "[]" :
-            Arrays.asList(targetStorageTypes)));
+            Arrays.asList(targetStorageTypes))
+            + ", target storage IDs=" + (targetStorageIds == null ? "[]" :
+            Arrays.asList(targetStorageIds)));
       }
       this.targets = targets;
       this.targetStorageTypes = targetStorageTypes;
+      this.targetStorageIds = targetStorageIds;
       this.b = b;
       this.stage = stage;
       BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
@@ -2456,7 +2463,7 @@ public class DataNode extends ReconfigurableBase
         //
         Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
             EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
-            targetStorageTypes);
+            targetStorageTypes, targetStorageIds);
 
         long writeTimeout = dnConf.socketWriteTimeout + 
                             HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
@@ -2477,10 +2484,13 @@ public class DataNode extends ReconfigurableBase
         DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg)
             .build();
 
+        String storageId = targetStorageIds.length > 0 ?
+            targetStorageIds[0] : null;
         new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
             clientname, targets, targetStorageTypes, srcNode,
             stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
-            false, false, null);
+            false, false, null, storageId,
+            targetStorageIds);
 
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, null);
@@ -2540,12 +2550,12 @@ public class DataNode extends ReconfigurableBase
    */
   public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b,
       EnumSet<AccessMode> mode,
-      StorageType[] storageTypes) throws IOException {
+      StorageType[] storageTypes, String[] storageIds) throws IOException {
     Token<BlockTokenIdentifier> accessToken = 
         BlockTokenSecretManager.DUMMY_TOKEN;
     if (isBlockTokenEnabled) {
       accessToken = blockPoolTokenSecretManager.generateToken(b, mode,
-          storageTypes);
+          storageTypes, storageIds);
     }
     return accessToken;
   }
@@ -2918,7 +2928,7 @@ public class DataNode extends ReconfigurableBase
           LOG.debug("Got: " + id.toString());
         }
         blockPoolTokenSecretManager.checkAccess(id, null, block,
-            BlockTokenIdentifier.AccessMode.READ, null);
+            BlockTokenIdentifier.AccessMode.READ, null, null);
       }
     }
   }
@@ -2934,7 +2944,8 @@ public class DataNode extends ReconfigurableBase
    */
   void transferReplicaForPipelineRecovery(final ExtendedBlock b,
       final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
-      final String client) throws IOException {
+      final String[] targetStorageIds, final String client)
+      throws IOException {
     final long storedGS;
     final long visible;
     final BlockConstructionStage stage;
@@ -2967,7 +2978,8 @@ public class DataNode extends ReconfigurableBase
     b.setNumBytes(visible);
 
     if (targets.length > 0) {
-      new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
+      new DataTransfer(targets, targetStorageTypes, targetStorageIds, b, stage,
+          client).run();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index cc13799..d42e330 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -354,7 +354,8 @@ class DataXceiver extends Receiver implements Runnable {
     updateCurrentThreadName("Passing file descriptors for block " + blk);
     DataOutputStream out = getBufferedOutputStream();
     checkAccess(out, true, blk, token,
-        Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ);
+        Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ,
+        null, null);
     BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
     FileInputStream fis[] = null;
     SlotId registeredSlotId = null;
@@ -662,7 +663,7 @@ class DataXceiver extends Receiver implements Runnable {
       final Token<BlockTokenIdentifier> blockToken,
       final String clientname,
       final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes, 
+      final StorageType[] targetStorageTypes,
       final DatanodeInfo srcDataNode,
       final BlockConstructionStage stage,
       final int pipelineSize,
@@ -673,7 +674,9 @@ class DataXceiver extends Receiver implements Runnable {
       CachingStrategy cachingStrategy,
       boolean allowLazyPersist,
       final boolean pinning,
-      final boolean[] targetPinnings) throws IOException {
+      final boolean[] targetPinnings,
+      final String storageId,
+      final String[] targetStorageIds) throws IOException {
     previousOpClientName = clientname;
     updateCurrentThreadName("Receiving block " + block);
     final boolean isDatanode = clientname.length() == 0;
@@ -692,8 +695,15 @@ class DataXceiver extends Receiver implements Runnable {
     if (targetStorageTypes.length > 0) {
       System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst);
     }
+    int nsi = targetStorageIds.length;
+    String[] storageIds = new String[nsi + 1];
+    storageIds[0] = storageId;
+    if (targetStorageTypes.length > 0) {
+      System.arraycopy(targetStorageIds, 0, storageIds, 1, nsi);
+    }
     checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK,
-        BlockTokenIdentifier.AccessMode.WRITE, storageTypes);
+        BlockTokenIdentifier.AccessMode.WRITE,
+        storageTypes, storageIds);
 
     // check single target for transfer-RBW/Finalized
     if (isTransfer && targets.length > 0) {
@@ -743,7 +753,7 @@ class DataXceiver extends Receiver implements Runnable {
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
-            cachingStrategy, allowLazyPersist, pinning));
+            cachingStrategy, allowLazyPersist, pinning, storageId));
         replica = blockReceiver.getReplica();
       } else {
         replica = datanode.data.recoverClose(
@@ -796,16 +806,18 @@ class DataXceiver extends Receiver implements Runnable {
 
           if (targetPinnings != null && targetPinnings.length > 0) {
             new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
-              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
-              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
-              latestGenerationStamp, requestedChecksum, cachingStrategy,
-                allowLazyPersist, targetPinnings[0], targetPinnings);
+                blockToken, clientname, targets, targetStorageTypes,
+                srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+                latestGenerationStamp, requestedChecksum, cachingStrategy,
+                allowLazyPersist, targetPinnings[0], targetPinnings,
+                targetStorageIds[0], targetStorageIds);
           } else {
             new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
-              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
-              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
-              latestGenerationStamp, requestedChecksum, cachingStrategy,
-                allowLazyPersist, false, targetPinnings);
+                blockToken, clientname, targets, targetStorageTypes,
+                srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+                latestGenerationStamp, requestedChecksum, cachingStrategy,
+                allowLazyPersist, false, targetPinnings,
+                targetStorageIds[0], targetStorageIds);
           }
 
           mirrorOut.flush();
@@ -929,17 +941,19 @@ class DataXceiver extends Receiver implements Runnable {
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes) throws IOException {
+      final StorageType[] targetStorageTypes,
+      final String[] targetStorageIds) throws IOException {
     previousOpClientName = clientName;
     updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
 
     final DataOutputStream out = new DataOutputStream(
         getOutputStream());
     checkAccess(out, true, blk, blockToken, Op.TRANSFER_BLOCK,
-        BlockTokenIdentifier.AccessMode.COPY, targetStorageTypes);
+        BlockTokenIdentifier.AccessMode.COPY, targetStorageTypes,
+        targetStorageIds);
     try {
       datanode.transferReplicaForPipelineRecovery(blk, targets,
-          targetStorageTypes, clientName);
+          targetStorageTypes, targetStorageIds, clientName);
       writeResponse(Status.SUCCESS, null, out);
     } catch (IOException ioe) {
       LOG.info("transferBlock " + blk + " received exception " + ioe);
@@ -1105,12 +1119,14 @@ class DataXceiver extends Receiver implements Runnable {
       final StorageType storageType, 
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,
-      final DatanodeInfo proxySource) throws IOException {
+      final DatanodeInfo proxySource,
+      final String storageId) throws IOException {
     updateCurrentThreadName("Replacing block " + block + " from " + delHint);
     DataOutputStream replyOut = new DataOutputStream(getOutputStream());
     checkAccess(replyOut, true, block, blockToken,
         Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE,
-        new StorageType[]{ storageType });
+        new StorageType[]{storageType},
+        new String[]{storageId});
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       String msg = "Not able to receive block " + block.getBlockId() +
@@ -1131,7 +1147,7 @@ class DataXceiver extends Receiver implements Runnable {
       // Move the block to different storage in the same datanode
       if (proxySource.equals(datanode.getDatanodeId())) {
         ReplicaInfo oldReplica = datanode.data.moveBlockAcrossStorage(block,
-            storageType);
+            storageType, storageId);
         if (oldReplica != null) {
           LOG.info("Moved " + block + " from StorageType "
               + oldReplica.getVolume().getStorageType() + " to " + storageType);
@@ -1188,7 +1204,7 @@ class DataXceiver extends Receiver implements Runnable {
             proxyReply, proxySock.getRemoteSocketAddress().toString(),
             proxySock.getLocalSocketAddress().toString(),
             null, 0, 0, 0, "", null, datanode, remoteChecksum,
-            CachingStrategy.newDropBehind(), false, false));
+            CachingStrategy.newDropBehind(), false, false, storageId));
         
         // receive a block
         blockReceiver.receiveBlock(null, null, replyOut, null, 
@@ -1258,11 +1274,12 @@ class DataXceiver extends Receiver implements Runnable {
       final DataNode dn, DataChecksum requestedChecksum,
       CachingStrategy cachingStrategy,
       final boolean allowLazyPersist,
-      final boolean pinning) throws IOException {
+      final boolean pinning,
+      final String storageId) throws IOException {
     return new BlockReceiver(block, storageType, in,
         inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd,
         clientname, srcDataNode, dn, requestedChecksum,
-        cachingStrategy, allowLazyPersist, pinning);
+        cachingStrategy, allowLazyPersist, pinning, storageId);
   }
 
   /**
@@ -1365,7 +1382,7 @@ class DataXceiver extends Receiver implements Runnable {
   private void checkAccess(OutputStream out, final boolean reply,
       ExtendedBlock blk, Token<BlockTokenIdentifier> t, Op op,
       BlockTokenIdentifier.AccessMode mode) throws IOException {
-    checkAccess(out, reply, blk, t, op, mode, null);
+    checkAccess(out, reply, blk, t, op, mode, null, null);
   }
 
   private void checkAccess(OutputStream out, final boolean reply,
@@ -1373,7 +1390,8 @@ class DataXceiver extends Receiver implements Runnable {
       final Token<BlockTokenIdentifier> t,
       final Op op,
       final BlockTokenIdentifier.AccessMode mode,
-      final StorageType[] storageTypes) throws IOException {
+      final StorageType[] storageTypes,
+      final String[] storageIds) throws IOException {
     checkAndWaitForBP(blk);
     if (datanode.isBlockTokenEnabled) {
       if (LOG.isDebugEnabled()) {
@@ -1382,7 +1400,7 @@ class DataXceiver extends Receiver implements Runnable {
       }
       try {
         datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode,
-            storageTypes);
+            storageTypes, storageIds);
       } catch(InvalidToken e) {
         try {
           if (reply) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 1492e5d..e076dda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -111,7 +111,8 @@ public final class ErasureCodingWorker {
             new StripedReconstructionInfo(
             reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(),
             reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),
-            reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes());
+            reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(),
+            reconInfo.getTargetStorageIDs());
         final StripedBlockReconstructor task =
             new StripedBlockReconstructor(this, stripedReconInfo);
         if (task.hasValidTargets()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
index b3884c2..39ef67e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
@@ -110,7 +110,7 @@ class StripedBlockReader {
           stripedReader.getSocketAddress4Transfer(source);
       Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
           block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ),
-          StorageType.EMPTY_ARRAY);
+          StorageType.EMPTY_ARRAY, new String[0]);
         /*
          * This can be further improved if the replica is local, then we can
          * read directly from DN and need to check the replica is FINALIZED

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
index a6989d4..24c1d61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
@@ -61,6 +61,7 @@ class StripedBlockWriter {
   private final ExtendedBlock block;
   private final DatanodeInfo target;
   private final StorageType storageType;
+  private final String storageId;
 
   private Socket targetSocket;
   private DataOutputStream targetOutputStream;
@@ -72,8 +73,8 @@ class StripedBlockWriter {
 
   StripedBlockWriter(StripedWriter stripedWriter, DataNode datanode,
                      Configuration conf, ExtendedBlock block,
-                     DatanodeInfo target, StorageType storageType)
-      throws IOException {
+                     DatanodeInfo target, StorageType storageType,
+                     String storageId) throws IOException {
     this.stripedWriter = stripedWriter;
     this.datanode = datanode;
     this.conf = conf;
@@ -81,6 +82,7 @@ class StripedBlockWriter {
     this.block = block;
     this.target = target;
     this.storageType = storageType;
+    this.storageId = storageId;
 
     this.targetBuffer = stripedWriter.allocateWriteBuffer();
 
@@ -117,7 +119,7 @@ class StripedBlockWriter {
       Token<BlockTokenIdentifier> blockToken =
           datanode.getBlockAccessToken(block,
               EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
-              new StorageType[]{storageType});
+              new StorageType[]{storageType}, new String[]{storageId});
 
       long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
       OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout);
@@ -141,7 +143,7 @@ class StripedBlockWriter {
           new StorageType[]{storageType}, source,
           BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0,
           stripedWriter.getChecksum(), stripedWriter.getCachingStrategy(),
-          false, false, null);
+          false, false, null, storageId, new String[]{storageId});
 
       targetSocket = socket;
       targetOutputStream = out;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
index a5c328b..a619c34 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
@@ -40,24 +40,27 @@ public class StripedReconstructionInfo {
   private final byte[] targetIndices;
   private final DatanodeInfo[] targets;
   private final StorageType[] targetStorageTypes;
+  private final String[] targetStorageIds;
 
   public StripedReconstructionInfo(ExtendedBlock blockGroup,
       ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
       byte[] targetIndices) {
-    this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null, null);
+    this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null,
+        null, null);
   }
 
   StripedReconstructionInfo(ExtendedBlock blockGroup,
       ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
-      DatanodeInfo[] targets, StorageType[] targetStorageTypes) {
+      DatanodeInfo[] targets, StorageType[] targetStorageTypes,
+      String[] targetStorageIds) {
     this(blockGroup, ecPolicy, liveIndices, sources, null, targets,
-        targetStorageTypes);
+        targetStorageTypes, targetStorageIds);
   }
 
   private StripedReconstructionInfo(ExtendedBlock blockGroup,
       ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
       byte[] targetIndices, DatanodeInfo[] targets,
-      StorageType[] targetStorageTypes) {
+      StorageType[] targetStorageTypes, String[] targetStorageIds) {
 
     this.blockGroup = blockGroup;
     this.ecPolicy = ecPolicy;
@@ -66,6 +69,7 @@ public class StripedReconstructionInfo {
     this.targetIndices = targetIndices;
     this.targets = targets;
     this.targetStorageTypes = targetStorageTypes;
+    this.targetStorageIds = targetStorageIds;
   }
 
   ExtendedBlock getBlockGroup() {
@@ -95,5 +99,9 @@ public class StripedReconstructionInfo {
   StorageType[] getTargetStorageTypes() {
     return targetStorageTypes;
   }
+
+  String[] getTargetStorageIds() {
+    return targetStorageIds;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
index 225a7ed..762506c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
@@ -55,6 +55,7 @@ class StripedWriter {
   private final short[] targetIndices;
   private boolean hasValidTargets;
   private final StorageType[] targetStorageTypes;
+  private final String[] targetStorageIds;
 
   private StripedBlockWriter[] writers;
 
@@ -77,6 +78,8 @@ class StripedWriter {
     assert targets != null;
     this.targetStorageTypes = stripedReconInfo.getTargetStorageTypes();
     assert targetStorageTypes != null;
+    this.targetStorageIds = stripedReconInfo.getTargetStorageIds();
+    assert targetStorageIds != null;
 
     writers = new StripedBlockWriter[targets.length];
 
@@ -192,7 +195,7 @@ class StripedWriter {
   private StripedBlockWriter createWriter(short index) throws IOException {
     return new StripedBlockWriter(this, datanode, conf,
         reconstructor.getBlock(targetIndices[index]), targets[index],
-        targetStorageTypes[index]);
+        targetStorageTypes[index], targetStorageIds[index]);
   }
 
   ByteBuffer allocateWriteBuffer() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
index 39d9547..efe222f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
@@ -113,8 +113,8 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
       new RoundRobinVolumeChoosingPolicy<V>();
 
   @Override
-  public V chooseVolume(List<V> volumes,
-      long replicaSize) throws IOException {
+  public V chooseVolume(List<V> volumes, long replicaSize, String storageId)
+      throws IOException {
     if (volumes.size() < 1) {
       throw new DiskOutOfSpaceException("No more available volumes");
     }
@@ -125,19 +125,20 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
             storageType.ordinal() : StorageType.DEFAULT.ordinal();
 
     synchronized (syncLocks[index]) {
-      return doChooseVolume(volumes, replicaSize);
+      return doChooseVolume(volumes, replicaSize, storageId);
     }
   }
 
-  private V doChooseVolume(final List<V> volumes,
-                         long replicaSize) throws IOException {
+  private V doChooseVolume(final List<V> volumes, long replicaSize,
+      String storageId) throws IOException {
     AvailableSpaceVolumeList volumesWithSpaces =
         new AvailableSpaceVolumeList(volumes);
     
     if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) {
       // If they're actually not too far out of whack, fall back on pure round
       // robin.
-      V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize);
+      V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize,
+          storageId);
       if (LOG.isDebugEnabled()) {
         LOG.debug("All volumes are within the configured free space balance " +
             "threshold. Selecting " + volume + " for write of block size " +
@@ -165,7 +166,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
       if (mostAvailableAmongLowVolumes < replicaSize ||
           random.nextFloat() < scaledPreferencePercent) {
         volume = roundRobinPolicyHighAvailable.chooseVolume(
-            highAvailableVolumes, replicaSize);
+            highAvailableVolumes, replicaSize, storageId);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Volumes are imbalanced. Selecting " + volume +
               " from high available space volumes for write of block size "
@@ -173,7 +174,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
         }
       } else {
         volume = roundRobinPolicyLowAvailable.chooseVolume(
-            lowAvailableVolumes, replicaSize);
+            lowAvailableVolumes, replicaSize, storageId);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Volumes are imbalanced. Selecting " + volume +
               " from low available space volumes for write of block size "
@@ -266,7 +267,8 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
   
   /**
    * Used so that we only check the available space on a given volume once, at
-   * the beginning of {@link AvailableSpaceVolumeChoosingPolicy#chooseVolume(List, long)}.
+   * the beginning of
+   * {@link AvailableSpaceVolumeChoosingPolicy#chooseVolume}.
    */
   private class AvailableSpaceVolumePair {
     private final V volume;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 9e979f7..d7e29cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -318,7 +318,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  ReplicaHandler createTemporary(StorageType storageType,
+  ReplicaHandler createTemporary(StorageType storageType, String storageId,
       ExtendedBlock b) throws IOException;
 
   /**
@@ -328,7 +328,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  ReplicaHandler createRbw(StorageType storageType,
+  ReplicaHandler createRbw(StorageType storageType, String storageId,
       ExtendedBlock b, boolean allowLazyPersist) throws IOException;
 
   /**
@@ -623,7 +623,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
      * Move block from one storage to another storage
      */
    ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
-        StorageType targetStorageType) throws IOException;
+        StorageType targetStorageType, String storageId) throws IOException;
 
   /**
    * Set a block to be pinned on this datanode so that it cannot be moved

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
index 9474b92..b9bcf1f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
@@ -50,7 +50,7 @@ public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
   }
 
   @Override
-  public V chooseVolume(final List<V> volumes, long blockSize)
+  public V chooseVolume(final List<V> volumes, long blockSize, String storageId)
       throws IOException {
 
     if (volumes.size() < 1) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java
index 62b1e75..8cbc058 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java
@@ -36,8 +36,11 @@ public interface VolumeChoosingPolicy<V extends FsVolumeSpi> {
    * 
    * @param volumes - a list of available volumes.
    * @param replicaSize - the size of the replica for which a volume is sought.
+   * @param storageId - the storage id of the Volume nominated by the namenode.
+   *                  This can usually be ignored by the VolumeChoosingPolicy.
    * @return the chosen volume.
    * @throws IOException when disks are unavailable or are full.
    */
-  public V chooseVolume(List<V> volumes, long replicaSize) throws IOException;
+  V chooseVolume(List<V> volumes, long replicaSize, String storageId)
+      throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 169e0e6..9a5002a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -927,7 +927,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override
   public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
-      StorageType targetStorageType) throws IOException {
+      StorageType targetStorageType, String targetStorageId)
+      throws IOException {
     ReplicaInfo replicaInfo = getReplicaInfo(block);
     if (replicaInfo.getState() != ReplicaState.FINALIZED) {
       throw new ReplicaNotFoundException(
@@ -952,7 +953,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     FsVolumeReference volumeRef = null;
     try (AutoCloseableLock lock = datasetLock.acquire()) {
-      volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
+      volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
+          block.getNumBytes());
     }
     try {
       moveBlock(block, replicaInfo, volumeRef);
@@ -1298,11 +1300,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
   }
-  
+
   @Override // FsDatasetSpi
   public ReplicaHandler createRbw(
-      StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
-      throws IOException {
+      StorageType storageType, String storageId, ExtendedBlock b,
+      boolean allowLazyPersist) throws IOException {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getBlockId());
@@ -1335,7 +1337,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
 
       if (ref == null) {
-        ref = volumes.getNextVolume(storageType, b.getNumBytes());
+        ref = volumes.getNextVolume(storageType, storageId, b.getNumBytes());
       }
 
       FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
@@ -1503,7 +1505,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override // FsDatasetSpi
   public ReplicaHandler createTemporary(
-      StorageType storageType, ExtendedBlock b) throws IOException {
+      StorageType storageType, String storageId, ExtendedBlock b)
+      throws IOException {
     long startTimeMs = Time.monotonicNow();
     long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
     ReplicaInfo lastFoundReplicaInfo = null;
@@ -1516,7 +1519,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
           }
           FsVolumeReference ref =
-              volumes.getNextVolume(storageType, b.getNumBytes());
+              volumes.getNextVolume(storageType, storageId, b.getNumBytes());
           FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
           ReplicaInPipeline newReplicaInfo;
           try {
@@ -2899,7 +2902,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                 replicaInfo.getVolume().isTransientStorage()) {
               // Pick a target volume to persist the block.
               targetReference = volumes.getNextVolume(
-                  StorageType.DEFAULT, replicaInfo.getNumBytes());
+                  StorageType.DEFAULT, null, replicaInfo.getNumBytes());
               targetVolume = (FsVolumeImpl) targetReference.getVolume();
 
               ramDiskReplicaTracker.recordStartLazyPersist(


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org