You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2017/05/05 19:05:17 UTC

[1/2] hadoop git commit: HDFS-9807. Add an optional StorageID to writes. Contributed by Ewan Higgs

Repository: hadoop
Updated Branches:
  refs/heads/trunk 4e6bbd049 -> a3954ccab


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index e7f0228..75baf84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -81,10 +81,11 @@ class FsVolumeList {
     return Collections.unmodifiableList(volumes);
   }
 
-  private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize)
-      throws IOException {
+  private FsVolumeReference chooseVolume(List<FsVolumeImpl> list,
+      long blockSize, String storageId) throws IOException {
     while (true) {
-      FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize);
+      FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize,
+          storageId);
       try {
         return volume.obtainReference();
       } catch (ClosedChannelException e) {
@@ -100,18 +101,20 @@ class FsVolumeList {
    * Get next volume.
    *
    * @param blockSize free space needed on the volume
-   * @param storageType the desired {@link StorageType} 
+   * @param storageType the desired {@link StorageType}
+   * @param storageId the storage id which may or may not be used by
+   *                  the VolumeChoosingPolicy.
    * @return next volume to store the block in.
    */
-  FsVolumeReference getNextVolume(StorageType storageType, long blockSize)
-      throws IOException {
+  FsVolumeReference getNextVolume(StorageType storageType, String storageId,
+      long blockSize) throws IOException {
     final List<FsVolumeImpl> list = new ArrayList<>(volumes.size());
     for(FsVolumeImpl v : volumes) {
       if (v.getStorageType() == storageType) {
         list.add(v);
       }
     }
-    return chooseVolume(list, blockSize);
+    return chooseVolume(list, blockSize, storageId);
   }
 
   /**
@@ -129,7 +132,7 @@ class FsVolumeList {
         list.add(v);
       }
     }
-    return chooseVolume(list, blockSize);
+    return chooseVolume(list, blockSize, null);
   }
 
   long getDfsUsed() throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 74cdeae..c98a336 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1018,7 +1018,8 @@ public class DFSTestUtil {
       // send the request
       new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
           dfsClient.clientName, new DatanodeInfo[]{datanodes[1]},
-          new StorageType[]{StorageType.DEFAULT});
+          new StorageType[]{StorageType.DEFAULT},
+          new String[0]);
       out.flush();
 
       return BlockOpResponseProto.parseDelimitedFrom(in);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
index b6884da..3a8fb59 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
@@ -1448,12 +1448,33 @@ public class TestBlockStoragePolicy {
     testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK,
         StorageType.SSD, StorageType.ARCHIVE},
         new StorageType[]{StorageType.DISK}, false);
+
+    testStorageTypeCheckAccessResult(
+        new StorageType[]{StorageType.DISK, StorageType.SSD},
+        new StorageType[]{StorageType.SSD},
+        true);
+
+    testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK},
+        new StorageType[]{StorageType.DISK}, false);
+
+    testStorageTypeCheckAccessResult(
+        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+            StorageType.ARCHIVE},
+        new StorageType[]{StorageType.DISK},
+        false);
+
+    testStorageTypeCheckAccessResult(
+        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+            StorageType.ARCHIVE},
+        new StorageType[]{StorageType.DISK},
+        false);
+
   }
 
   private void testStorageTypeCheckAccessResult(StorageType[] requested,
       StorageType[] allowed, boolean expAccess) {
     try {
-      BlockTokenSecretManager.checkAccess(requested, allowed);
+      BlockTokenSecretManager.checkAccess(requested, allowed, "StorageTypes");
       if (!expAccess) {
         fail("No expected access with allowed StorageTypes "
             + Arrays.toString(allowed) + " and requested StorageTypes "
@@ -1467,4 +1488,56 @@ public class TestBlockStoragePolicy {
       }
     }
   }
+
+  @Test
+  public void testStorageIDCheckAccess() {
+    testStorageIDCheckAccessResult(
+        new String[]{"DN1-Storage1"},
+        new String[]{"DN1-Storage1"}, true);
+
+    testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"},
+        new String[]{"DN1-Storage1"},
+        true);
+
+    testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"},
+        new String[]{"DN1-Storage1", "DN1-Storage2"}, false);
+
+    testStorageIDCheckAccessResult(
+        new String[]{"DN1-Storage1", "DN1-Storage2"},
+        new String[]{"DN1-Storage1"}, true);
+
+    testStorageIDCheckAccessResult(
+        new String[]{"DN1-Storage1", "DN1-Storage2"},
+        new String[]{"DN2-Storage1"}, false);
+
+    testStorageIDCheckAccessResult(
+        new String[]{"DN1-Storage2", "DN2-Storage2"},
+        new String[]{"DN1-Storage1", "DN2-Storage1"}, false);
+
+    testStorageIDCheckAccessResult(new String[0], new String[0], false);
+
+    testStorageIDCheckAccessResult(new String[0], new String[]{"DN1-Storage1"},
+        true);
+
+    testStorageIDCheckAccessResult(new String[]{"DN1-Storage1"}, new String[0],
+        false);
+  }
+
+  private void testStorageIDCheckAccessResult(String[] requested,
+          String[] allowed, boolean expAccess) {
+    try {
+      BlockTokenSecretManager.checkAccess(requested, allowed, "StorageIDs");
+      if (!expAccess) {
+        fail("No expected access with allowed StorageIDs"
+            + Arrays.toString(allowed) + " and requested StorageIDs"
+            + Arrays.toString(requested));
+      }
+    } catch (SecretManager.InvalidToken e) {
+      if (expAccess) {
+        fail("Expected access with allowed StorageIDs "
+            + Arrays.toString(allowed) + " and requested StorageIDs"
+            + Arrays.toString(requested));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index 3f4fe28..7a2ac1b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -559,6 +559,7 @@ public class TestDataTransferProtocol {
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], new StorageType[1], null, stage,
         0, block.getNumBytes(), block.getNumBytes(), newGS,
-        checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
+        checksum, CachingStrategy.newDefaultStrategy(), false, false,
+        null, null, new String[0]);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
index 5c1b38f..e159914 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
@@ -98,11 +98,11 @@ public class TestWriteBlockGetsBlockLengthHint {
      * correctly propagate the hint to FsDatasetSpi.
      */
     @Override
-    public synchronized ReplicaHandler createRbw(
-        StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
+    public synchronized ReplicaHandler createRbw(StorageType storageType,
+        String storageId, ExtendedBlock b, boolean allowLazyPersist)
         throws IOException {
       assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
-      return super.createRbw(storageType, b, allowLazyPersist);
+      return super.createRbw(storageType, storageId, b, allowLazyPersist);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index e98207f..747f295 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -151,7 +151,7 @@ public class TestBlockToken {
         assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
         sm.checkAccess(id, null, PBHelperClient.convert(req.getBlock()),
             BlockTokenIdentifier.AccessMode.WRITE,
-            new StorageType[]{StorageType.DEFAULT});
+            new StorageType[]{StorageType.DEFAULT}, null);
         result = id.getBlockId();
       }
       return GetReplicaVisibleLengthResponseProto.newBuilder()
@@ -160,11 +160,11 @@ public class TestBlockToken {
   }
 
   private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
-                                               ExtendedBlock block,
-                                               EnumSet<BlockTokenIdentifier.AccessMode> accessModes,
-                                               StorageType... storageTypes) throws IOException {
+      ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> accessModes,
+      StorageType[] storageTypes, String[] storageIds)
+      throws IOException {
     Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes,
-        storageTypes);
+        storageTypes, storageIds);
     BlockTokenIdentifier id = sm.createIdentifier();
     id.readFields(new DataInputStream(new ByteArrayInputStream(token
         .getIdentifier())));
@@ -178,29 +178,28 @@ public class TestBlockToken {
         enableProtobuf);
     TestWritable.testWritable(generateTokenId(sm, block3,
         EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
-        StorageType.DEFAULT));
+        new StorageType[]{StorageType.DEFAULT}, null));
     TestWritable.testWritable(generateTokenId(sm, block3,
         EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
-        StorageType.DEFAULT));
+        new StorageType[]{StorageType.DEFAULT}, null));
     TestWritable.testWritable(generateTokenId(sm, block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
-        StorageType.DEFAULT));
+        new StorageType[]{StorageType.DEFAULT}, null));
     TestWritable.testWritable(generateTokenId(sm, block1,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
-        StorageType.DEFAULT));
+        new StorageType[]{StorageType.DEFAULT}, null));
     TestWritable.testWritable(generateTokenId(sm, block2,
         EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
-        StorageType.DEFAULT));
+        new StorageType[]{StorageType.DEFAULT}, null));
     TestWritable.testWritable(generateTokenId(sm, block3,
         EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
-        StorageType.DEFAULT));
+        new StorageType[]{StorageType.DEFAULT}, null));
     // We must be backwards compatible when adding storageType
     TestWritable.testWritable(generateTokenId(sm, block3,
-        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
-        (StorageType[]) null));
+        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), null, null));
     TestWritable.testWritable(generateTokenId(sm, block3,
         EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
-        StorageType.EMPTY_ARRAY));
+        StorageType.EMPTY_ARRAY, null));
   }
 
   @Test
@@ -215,35 +214,36 @@ public class TestBlockToken {
 
   private static void checkAccess(BlockTokenSecretManager m,
       Token<BlockTokenIdentifier> t, ExtendedBlock blk,
-      BlockTokenIdentifier.AccessMode mode) throws SecretManager.InvalidToken {
-    m.checkAccess(t, null, blk, mode, new StorageType[]{ StorageType.DEFAULT });
+      BlockTokenIdentifier.AccessMode mode, StorageType[] storageTypes,
+      String[] storageIds) throws SecretManager.InvalidToken {
+    m.checkAccess(t, null, blk, mode, storageTypes, storageIds);
   }
 
   private void tokenGenerationAndVerification(BlockTokenSecretManager master,
-      BlockTokenSecretManager slave, StorageType... storageTypes)
-      throws Exception {
+      BlockTokenSecretManager slave, StorageType[] storageTypes,
+      String[] storageIds) throws Exception {
     // single-mode tokens
     for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
         .values()) {
       // generated by master
       Token<BlockTokenIdentifier> token1 = master.generateToken(block1,
-          EnumSet.of(mode), storageTypes);
-      checkAccess(master, token1, block1, mode);
-      checkAccess(slave, token1, block1, mode);
+          EnumSet.of(mode), storageTypes, storageIds);
+      checkAccess(master, token1, block1, mode, storageTypes, storageIds);
+      checkAccess(slave, token1, block1, mode, storageTypes, storageIds);
       // generated by slave
       Token<BlockTokenIdentifier> token2 = slave.generateToken(block2,
-          EnumSet.of(mode), storageTypes);
-      checkAccess(master, token2, block2, mode);
-      checkAccess(slave, token2, block2, mode);
+          EnumSet.of(mode), storageTypes, storageIds);
+      checkAccess(master, token2, block2, mode, storageTypes, storageIds);
+      checkAccess(slave, token2, block2, mode, storageTypes, storageIds);
     }
     // multi-mode tokens
     Token<BlockTokenIdentifier> mtoken = master.generateToken(block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
-        storageTypes);
+        storageTypes, storageIds);
     for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
         .values()) {
-      checkAccess(master, mtoken, block3, mode);
-      checkAccess(slave, mtoken, block3, mode);
+      checkAccess(master, mtoken, block3, mode, storageTypes, storageIds);
+      checkAccess(slave, mtoken, block3, mode, storageTypes, storageIds);
     }
   }
 
@@ -259,18 +259,18 @@ public class TestBlockToken {
     ExportedBlockKeys keys = masterHandler.exportKeys();
     slaveHandler.addKeys(keys);
     tokenGenerationAndVerification(masterHandler, slaveHandler,
-        StorageType.DEFAULT);
-    tokenGenerationAndVerification(masterHandler, slaveHandler, null);
+        new StorageType[]{StorageType.DEFAULT}, null);
+    tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
     // key updating
     masterHandler.updateKeys();
     tokenGenerationAndVerification(masterHandler, slaveHandler,
-        StorageType.DEFAULT);
-    tokenGenerationAndVerification(masterHandler, slaveHandler, null);
+        new StorageType[]{StorageType.DEFAULT}, null);
+    tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
     keys = masterHandler.exportKeys();
     slaveHandler.addKeys(keys);
     tokenGenerationAndVerification(masterHandler, slaveHandler,
-        StorageType.DEFAULT);
-    tokenGenerationAndVerification(masterHandler, slaveHandler, null);
+        new StorageType[]{StorageType.DEFAULT}, null);
+    tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
   }
 
   @Test
@@ -315,7 +315,7 @@ public class TestBlockToken {
         enableProtobuf);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
-        new StorageType[]{StorageType.DEFAULT});
+        new StorageType[]{StorageType.DEFAULT}, new String[0]);
 
     final Server server = createMockDatanode(sm, token, conf);
 
@@ -365,7 +365,7 @@ public class TestBlockToken {
         enableProtobuf);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
-        new StorageType[]{StorageType.DEFAULT});
+        new StorageType[]{StorageType.DEFAULT}, new String[0]);
 
     final Server server = createMockDatanode(sm, token, conf);
     server.start();
@@ -451,19 +451,23 @@ public class TestBlockToken {
 
       ExportedBlockKeys keys = masterHandler.exportKeys();
       bpMgr.addKeys(bpid, keys);
+      String[] storageIds = new String[] {"DS-9001"};
       tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
-          StorageType.DEFAULT);
-      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
+          new StorageType[]{StorageType.DEFAULT}, storageIds);
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
+          null);
       // Test key updating
       masterHandler.updateKeys();
       tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
-          StorageType.DEFAULT);
-      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
+          new StorageType[]{StorageType.DEFAULT}, storageIds);
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
+          null);
       keys = masterHandler.exportKeys();
       bpMgr.addKeys(bpid, keys);
       tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
-          StorageType.DEFAULT);
-      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
+          new StorageType[]{StorageType.DEFAULT}, new String[]{"DS-9001"});
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
+          null);
     }
   }
 
@@ -540,7 +544,7 @@ public class TestBlockToken {
         useProto);
     Token<BlockTokenIdentifier> token = sm.generateToken(block1,
         EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
-        new StorageType[]{StorageType.DEFAULT});
+        new StorageType[]{StorageType.DEFAULT}, new String[0]);
     final byte[] tokenBytes = token.getIdentifier();
     BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
     BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@@ -605,7 +609,7 @@ public class TestBlockToken {
         useProto);
     Token<BlockTokenIdentifier> token = sm.generateToken(block1,
         EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
-        StorageType.EMPTY_ARRAY);
+        StorageType.EMPTY_ARRAY, new String[0]);
     final byte[] tokenBytes = token.getIdentifier();
     BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
     BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@@ -699,7 +703,8 @@ public class TestBlockToken {
      */
     BlockTokenIdentifier identifier = new BlockTokenIdentifier("user",
         "blockpool", 123, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
-        new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, true);
+        new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
+        new String[] {"fake-storage-id"}, true);
     Calendar cal = new GregorianCalendar();
     cal.set(2017, 1, 9, 0, 12, 35);
     long datetime = cal.getTimeInMillis();
@@ -749,7 +754,8 @@ public class TestBlockToken {
         new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
             StorageType.DISK, StorageType.ARCHIVE};
     BlockTokenIdentifier ident = new BlockTokenIdentifier("user", "bpool",
-        123, accessModes, storageTypes, useProto);
+        123, accessModes, storageTypes, new String[] {"fake-storage-id"},
+        useProto);
     ident.setExpiryDate(1487080345L);
     BlockTokenIdentifier ret = writeAndReadBlockToken(ident);
     assertEquals(ret.getExpiryDate(), 1487080345L);
@@ -760,6 +766,7 @@ public class TestBlockToken {
     assertEquals(ret.getAccessModes(),
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
     assertArrayEquals(ret.getStorageTypes(), storageTypes);
+    assertArrayEquals(ret.getStorageIds(), new String[] {"fake-storage-id"});
   }
 
   @Test
@@ -767,5 +774,4 @@ public class TestBlockToken {
     testBlockTokenSerialization(false);
     testBlockTokenSerialization(true);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index 6810a0b..c9ff572 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -389,7 +389,7 @@ public abstract class BlockReportTestBase {
     // Create a bogus new block which will not be present on the namenode.
     ExtendedBlock b = new ExtendedBlock(
         poolId, rand.nextLong(), 1024L, rand.nextLong());
-    dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false);
+    dn.getFSDataset().createRbw(StorageType.DEFAULT, null, b, false);
 
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index cd3befd..18b4922 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1023,21 +1023,22 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override // FsDatasetSpi
   public synchronized ReplicaHandler createRbw(
-      StorageType storageType, ExtendedBlock b,
+      StorageType storageType, String storageId, ExtendedBlock b,
       boolean allowLazyPersist) throws IOException {
-    return createTemporary(storageType, b);
+    return createTemporary(storageType, storageId, b);
   }
 
   @Override // FsDatasetSpi
   public synchronized ReplicaHandler createTemporary(
-      StorageType storageType, ExtendedBlock b) throws IOException {
+      StorageType storageType, String storageId, ExtendedBlock b)
+      throws IOException {
     if (isValidBlock(b)) {
-          throw new ReplicaAlreadyExistsException("Block " + b + 
-              " is valid, and cannot be written to.");
-      }
+      throw new ReplicaAlreadyExistsException("Block " + b +
+          " is valid, and cannot be written to.");
+    }
     if (isValidRbw(b)) {
-        throw new ReplicaAlreadyExistsException("Block " + b + 
-            " is being written, and cannot be written to.");
+      throw new ReplicaAlreadyExistsException("Block " + b +
+          " is being written, and cannot be written to.");
     }
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
@@ -1419,7 +1420,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override
   public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
-      StorageType targetStorageType) throws IOException {
+      StorageType targetStorageType, String storageId) throws IOException {
     // TODO Auto-generated method stub
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 579252b..311d5a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -647,7 +647,7 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    dn.data.createRbw(StorageType.DEFAULT, block, false);
+    dn.data.createRbw(StorageType.DEFAULT, null, block, false);
     BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
         recoveryWorker.new RecoveryTaskContiguous(rBlock);
     try {
@@ -673,7 +673,7 @@ public class TestBlockRecovery {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
     ReplicaInPipeline replicaInfo = dn.data.createRbw(
-        StorageType.DEFAULT, block, false).getReplica();
+        StorageType.DEFAULT, null, block, false).getReplica();
     ReplicaOutputStreams streams = null;
     try {
       streams = replicaInfo.createStreams(true,
@@ -972,7 +972,7 @@ public class TestBlockRecovery {
           // Register this thread as the writer for the recoveringBlock.
           LOG.debug("slowWriter creating rbw");
           ReplicaHandler replicaHandler =
-              spyDN.data.createRbw(StorageType.DISK, block, false);
+              spyDN.data.createRbw(StorageType.DISK, null, block, false);
           replicaHandler.close();
           LOG.debug("slowWriter created rbw");
           // Tell the parent thread to start progressing.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
index f811bd8..8992d47 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
@@ -394,7 +394,7 @@ public class TestBlockReplacement {
       DataOutputStream out = new DataOutputStream(sock.getOutputStream());
       new Sender(out).replaceBlock(block, targetStorageType,
           BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
-          sourceProxy);
+          sourceProxy, null);
       out.flush();
       // receiveResponse
       DataInputStream reply = new DataInputStream(sock.getInputStream());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
index b2bfe49..8fda664 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
@@ -129,7 +129,7 @@ public class TestDataXceiverLazyPersistHint {
         DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 0),
         CachingStrategy.newDefaultStrategy(),
         lazyPersist,
-        false, null);
+        false, null, null, new String[0]);
   }
 
   // Helper functions to setup the mock objects.
@@ -151,7 +151,7 @@ public class TestDataXceiverLazyPersistHint {
         any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(),
         anyString(), any(DatanodeInfo.class), any(DataNode.class),
         any(DataChecksum.class), any(CachingStrategy.class),
-        captor.capture(), anyBoolean());
+        captor.capture(), anyBoolean(), any(String.class));
     doReturn(mock(DataOutputStream.class)).when(xceiverSpy)
         .getBufferedOutputStream();
     return xceiverSpy;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index cd86720..38e4287 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -167,7 +167,8 @@ public class TestDiskError {
         BlockTokenSecretManager.DUMMY_TOKEN, "",
         new DatanodeInfo[0], new StorageType[0], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
-        checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
+        checksum, CachingStrategy.newDefaultStrategy(), false, false,
+        null, null, new String[0]);
     out.flush();
 
     // close the connection before sending the content of the block
@@ -274,7 +275,7 @@ public class TestDiskError {
             dn1.getDatanodeId());
 
     dn0.transferBlock(block, new DatanodeInfo[]{dnd1},
-        new StorageType[]{StorageType.DISK});
+        new StorageType[]{StorageType.DISK}, new String[0]);
     // Sleep for 1 second so the DataTrasnfer daemon can start transfer.
     try {
       Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index 4e724bc7..2e69595 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -81,7 +81,7 @@ public class TestSimulatedFSDataset {
       // we pass expected len as zero, - fsdataset should use the sizeof actual
       // data written
       ReplicaInPipeline bInfo = fsdataset.createRbw(
-          StorageType.DEFAULT, b, false).getReplica();
+          StorageType.DEFAULT, null, b, false).getReplica();
       ReplicaOutputStreams out = bInfo.createStreams(true,
           DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
       try {
@@ -368,7 +368,7 @@ public class TestSimulatedFSDataset {
           ExtendedBlock block = new ExtendedBlock(newbpid,1);
           try {
             // it will throw an exception if the block pool is not found
-            fsdataset.createTemporary(StorageType.DEFAULT, block);
+            fsdataset.createTemporary(StorageType.DEFAULT, null, block);
           } catch (IOException ioe) {
             // JUnit does not capture exception in non-main thread,
             // so cache it and then let main thread throw later.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 62ef731..2e439d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -138,14 +138,15 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b)
+  public ReplicaHandler createTemporary(StorageType t, String i,
+      ExtendedBlock b)
       throws IOException {
     return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
   }
 
   @Override
-  public ReplicaHandler createRbw(StorageType t, ExtendedBlock b, boolean tf)
-      throws IOException {
+  public ReplicaHandler createRbw(StorageType storageType, String id,
+      ExtendedBlock b, boolean tf) throws IOException {
     return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
   }
 
@@ -332,7 +333,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, StorageType targetStorageType) throws IOException {
+  public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
+      StorageType targetStorageType, String storageId) throws IOException {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
index 9414a0e..24a43e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
@@ -89,10 +89,12 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
     // than the threshold of 1MB.
     volumes.add(Mockito.mock(FsVolumeSpi.class));
     Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3);
-    
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
   }
   
   @Test(timeout=60000)
@@ -115,21 +117,29 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
     // Third volume, again with 3MB free space.
     volumes.add(Mockito.mock(FsVolumeSpi.class));
     Mockito.when(volumes.get(2).getAvailable()).thenReturn(1024L * 1024L * 3);
-    
+
     // We should alternate assigning between the two volumes with a lot of free
     // space.
     initPolicy(policy, 1.0f);
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+        null));
 
     // All writes should be assigned to the volume with the least free space.
     initPolicy(policy, 0.0f);
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+        null));
   }
   
   @Test(timeout=60000)
@@ -156,22 +166,30 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
     // Fourth volume, again with 3MB free space.
     volumes.add(Mockito.mock(FsVolumeSpi.class));
     Mockito.when(volumes.get(3).getAvailable()).thenReturn(1024L * 1024L * 3);
-    
+
     // We should alternate assigning between the two volumes with a lot of free
     // space.
     initPolicy(policy, 1.0f);
-    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100));
+    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100,
+        null));
 
     // We should alternate assigning between the two volumes with less free
     // space.
     initPolicy(policy, 0.0f);
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+         null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
   }
   
   @Test(timeout=60000)
@@ -190,13 +208,14 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
     // than the threshold of 1MB.
     volumes.add(Mockito.mock(FsVolumeSpi.class));
     Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3);
-    
+
     // All writes should be assigned to the volume with the least free space.
     // However, if the volume with the least free space doesn't have enough
     // space to accept the replica size, and another volume does have enough
     // free space, that should be chosen instead.
     initPolicy(policy, 0.0f);
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 1024L * 1024L * 2));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes,
+        1024L * 1024L * 2, null));
   }
   
   @Test(timeout=60000)
@@ -220,10 +239,11 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
         .thenReturn(1024L * 1024L * 3)
         .thenReturn(1024L * 1024L * 3)
         .thenReturn(1024L * 1024L * 1); // After the third check, return 1MB.
-    
+
     // Should still be able to get a volume for the replica even though the
     // available space on the second volume changed.
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes,
+        100, null));
   }
   
   @Test(timeout=60000)
@@ -271,12 +291,12 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
       Mockito.when(volume.getAvailable()).thenReturn(1024L * 1024L * 3);
       volumes.add(volume);
     }
-    
+
     initPolicy(policy, preferencePercent);
     long lowAvailableSpaceVolumeSelected = 0;
     long highAvailableSpaceVolumeSelected = 0;
     for (int i = 0; i < RANDOMIZED_ITERATIONS; i++) {
-      FsVolumeSpi volume = policy.chooseVolume(volumes, 100);
+      FsVolumeSpi volume = policy.chooseVolume(volumes, 100, null);
       for (int j = 0; j < volumes.size(); j++) {
         // Note how many times the first low available volume was selected
         if (volume == volumes.get(j) && j == 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
index 9b3047f..44e2a30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
@@ -50,20 +50,21 @@ public class TestRoundRobinVolumeChoosingPolicy {
     // Second volume, with 200 bytes of space.
     volumes.add(Mockito.mock(FsVolumeSpi.class));
     Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
-    
+
     // Test two rounds of round-robin choosing
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null));
 
     // The first volume has only 100L space, so the policy should
     // wisely choose the second one in case we ask for more.
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150,
+        null));
 
     // Fail if no volume can be chosen?
     try {
-      policy.chooseVolume(volumes, Long.MAX_VALUE);
+      policy.chooseVolume(volumes, Long.MAX_VALUE, null);
       Assert.fail();
     } catch (IOException e) {
       // Passed.
@@ -93,7 +94,7 @@ public class TestRoundRobinVolumeChoosingPolicy {
 
     int blockSize = 700;
     try {
-      policy.chooseVolume(volumes, blockSize);
+      policy.chooseVolume(volumes, blockSize, null);
       Assert.fail("expected to throw DiskOutOfSpaceException");
     } catch(DiskOutOfSpaceException e) {
       Assert.assertEquals("Not returnig the expected message",
@@ -137,21 +138,21 @@ public class TestRoundRobinVolumeChoosingPolicy {
     Mockito.when(ssdVolumes.get(1).getAvailable()).thenReturn(100L);
 
     Assert.assertEquals(diskVolumes.get(0),
-            policy.chooseVolume(diskVolumes, 0));
+            policy.chooseVolume(diskVolumes, 0, null));
     // Independent Round-Robin for different storage type
     Assert.assertEquals(ssdVolumes.get(0),
-            policy.chooseVolume(ssdVolumes, 0));
+            policy.chooseVolume(ssdVolumes, 0, null));
     // Take block size into consideration
     Assert.assertEquals(ssdVolumes.get(0),
-            policy.chooseVolume(ssdVolumes, 150L));
+            policy.chooseVolume(ssdVolumes, 150L, null));
 
     Assert.assertEquals(diskVolumes.get(1),
-            policy.chooseVolume(diskVolumes, 0));
+            policy.chooseVolume(diskVolumes, 0, null));
     Assert.assertEquals(diskVolumes.get(0),
-            policy.chooseVolume(diskVolumes, 50L));
+            policy.chooseVolume(diskVolumes, 50L, null));
 
     try {
-      policy.chooseVolume(diskVolumes, 200L);
+      policy.chooseVolume(diskVolumes, 200L, null);
       Assert.fail("Should throw an DiskOutOfSpaceException before this!");
     } catch (DiskOutOfSpaceException e) {
       // Pass.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 905c3f0..3293561 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -259,7 +259,7 @@ public class TestFsDatasetImpl {
       String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
       ExtendedBlock eb = new ExtendedBlock(bpid, i);
       try (ReplicaHandler replica =
-          dataset.createRbw(StorageType.DEFAULT, eb, false)) {
+          dataset.createRbw(StorageType.DEFAULT, null, eb, false)) {
       }
     }
     final String[] dataDirs =
@@ -566,7 +566,7 @@ public class TestFsDatasetImpl {
     class ResponderThread extends Thread {
       public void run() {
         try (ReplicaHandler replica = dataset
-            .createRbw(StorageType.DEFAULT, eb, false)) {
+            .createRbw(StorageType.DEFAULT, null, eb, false)) {
           LOG.info("CreateRbw finished");
           startFinalizeLatch.countDown();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index 83c15ca..ee3a79f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -101,7 +101,7 @@ public class TestFsVolumeList {
     }
     for (int i = 0; i < 10; i++) {
       try (FsVolumeReference ref =
-          volumeList.getNextVolume(StorageType.DEFAULT, 128)) {
+          volumeList.getNextVolume(StorageType.DEFAULT, null, 128)) {
         // volume No.2 will not be chosen.
         assertNotEquals(ref.getVolume(), volumes.get(1));
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index da53cae..11525ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -353,7 +353,7 @@ public class TestWriteToReplica {
     }
  
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false);
+      dataSet.createRbw(StorageType.DEFAULT, null, blocks[FINALIZED], false);
       Assert.fail("Should not have created a replica that's already " +
       		"finalized " + blocks[FINALIZED]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -371,7 +371,7 @@ public class TestWriteToReplica {
     }
 
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false);
+      dataSet.createRbw(StorageType.DEFAULT, null, blocks[TEMPORARY], false);
       Assert.fail("Should not have created a replica that had created as " +
       		"temporary " + blocks[TEMPORARY]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -381,7 +381,7 @@ public class TestWriteToReplica {
         0L, blocks[RBW].getNumBytes());  // expect to be successful
     
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false);
+      dataSet.createRbw(StorageType.DEFAULT, null, blocks[RBW], false);
       Assert.fail("Should not have created a replica that had created as RBW " +
           blocks[RBW]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -397,7 +397,7 @@ public class TestWriteToReplica {
     }
 
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false);
+      dataSet.createRbw(StorageType.DEFAULT, null, blocks[RWR], false);
       Assert.fail("Should not have created a replica that was waiting to be " +
       		"recovered " + blocks[RWR]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -413,7 +413,7 @@ public class TestWriteToReplica {
     }
 
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false);
+      dataSet.createRbw(StorageType.DEFAULT, null, blocks[RUR], false);
       Assert.fail("Should not have created a replica that was under recovery " +
           blocks[RUR]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -430,49 +430,49 @@ public class TestWriteToReplica {
           e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
     }
     
-    dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
+    dataSet.createRbw(StorageType.DEFAULT, null, blocks[NON_EXISTENT], false);
   }
   
   private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED]);
       Assert.fail("Should not have created a temporary replica that was " +
       		"finalized " + blocks[FINALIZED]);
     } catch (ReplicaAlreadyExistsException e) {
     }
  
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY]);
       Assert.fail("Should not have created a replica that had created as" +
       		"temporary " + blocks[TEMPORARY]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW]);
       Assert.fail("Should not have created a replica that had created as RBW " +
           blocks[RBW]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR]);
       Assert.fail("Should not have created a replica that was waiting to be " +
       		"recovered " + blocks[RWR]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR]);
       Assert.fail("Should not have created a replica that was under recovery " +
           blocks[RUR]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
-    dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+    dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
 
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
       Assert.fail("Should not have created a replica that had already been "
           + "created " + blocks[NON_EXISTENT]);
     } catch (Exception e) {
@@ -485,7 +485,8 @@ public class TestWriteToReplica {
     blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
     try {
       ReplicaInPipeline replicaInfo =
-          dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica();
+          dataSet.createTemporary(StorageType.DEFAULT, null,
+              blocks[NON_EXISTENT]).getReplica();
       Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
       Assert.assertTrue(
           replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java
new file mode 100644
index 0000000..e0f7426
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.server.blockmanagement.*;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.apache.hadoop.net.Node;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test to ensure that the StorageType and StorageID sent from Namenode
+ * to DFSClient are respected.
+ */
+public class TestNamenodeStorageDirectives {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestNamenodeStorageDirectives.class);
+
+  private static final int BLOCK_SIZE = 512;
+
+  private MiniDFSCluster cluster;
+
+  @After
+  public void tearDown() {
+    shutdown();
+  }
+
+  private void startDFSCluster(int numNameNodes, int numDataNodes,
+      int storagePerDataNode, StorageType[][] storageTypes)
+      throws IOException {
+    startDFSCluster(numNameNodes, numDataNodes, storagePerDataNode,
+        storageTypes, RoundRobinVolumeChoosingPolicy.class,
+        BlockPlacementPolicyDefault.class);
+  }
+
+  private void startDFSCluster(int numNameNodes, int numDataNodes,
+      int storagePerDataNode, StorageType[][] storageTypes,
+      Class<? extends VolumeChoosingPolicy> volumeChoosingPolicy,
+      Class<? extends BlockPlacementPolicy> blockPlacementPolicy) throws
+      IOException {
+    shutdown();
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+
+    /*
+     * Lower the DN heartbeat, DF rate, and recheck interval to one second
+     * so state about failures and datanode death propagates faster.
+     */
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+        1000);
+    /* Allow 1 volume failure */
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
+    conf.setClass(
+        DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
+        volumeChoosingPolicy, VolumeChoosingPolicy.class);
+    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        blockPlacementPolicy, BlockPlacementPolicy.class);
+
+    MiniDFSNNTopology nnTopology =
+        MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(nnTopology)
+        .numDataNodes(numDataNodes)
+        .storagesPerDatanode(storagePerDataNode)
+        .storageTypes(storageTypes)
+        .build();
+    cluster.waitActive();
+  }
+
+  private void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private void createFile(Path path, int numBlocks, short replicateFactor)
+      throws IOException, InterruptedException, TimeoutException {
+    createFile(0, path, numBlocks, replicateFactor);
+  }
+
+  private void createFile(int fsIdx, Path path, int numBlocks,
+      short replicateFactor)
+      throws IOException, TimeoutException, InterruptedException {
+    final int seed = 0;
+    final DistributedFileSystem fs = cluster.getFileSystem(fsIdx);
+    DFSTestUtil.createFile(fs, path, BLOCK_SIZE * numBlocks,
+        replicateFactor, seed);
+    DFSTestUtil.waitReplication(fs, path, replicateFactor);
+  }
+
+  private boolean verifyFileReplicasOnStorageType(Path path, int numBlocks,
+      StorageType storageType) throws IOException {
+    MiniDFSCluster.NameNodeInfo info = cluster.getNameNodeInfos()[0];
+    InetSocketAddress addr = info.nameNode.getServiceRpcAddress();
+    assert addr.getPort() != 0;
+    DFSClient client = new DFSClient(addr, cluster.getConfiguration(0));
+
+    FileSystem fs = cluster.getFileSystem();
+
+    if (!fs.exists(path)) {
+      LOG.info("verifyFileReplicasOnStorageType: file {} does not exist", path);
+      return false;
+    }
+    long fileLength = client.getFileInfo(path.toString()).getLen();
+    int foundBlocks = 0;
+    LocatedBlocks locatedBlocks =
+        client.getLocatedBlocks(path.toString(), 0, fileLength);
+    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+      for (StorageType st : locatedBlock.getStorageTypes()) {
+        if (st == storageType) {
+          foundBlocks++;
+        }
+      }
+    }
+
+    LOG.info("Found {}/{} blocks on StorageType {}",
+        foundBlocks, numBlocks, storageType);
+    final boolean isValid = foundBlocks >= numBlocks;
+    return isValid;
+  }
+
+  private void testStorageTypes(StorageType[][] storageTypes,
+      String storagePolicy, StorageType[] expectedStorageTypes,
+      StorageType[] unexpectedStorageTypes) throws ReconfigurationException,
+      InterruptedException, TimeoutException, IOException {
+    final int numDataNodes = storageTypes.length;
+    final int storagePerDataNode = storageTypes[0].length;
+    startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes);
+    cluster.getFileSystem(0).setStoragePolicy(new Path("/"), storagePolicy);
+    Path testFile = new Path("/test");
+    final short replFactor = 2;
+    final int numBlocks = 10;
+    createFile(testFile, numBlocks, replFactor);
+
+    for (StorageType storageType: expectedStorageTypes) {
+      assertTrue(verifyFileReplicasOnStorageType(testFile, numBlocks,
+          storageType));
+    }
+
+    for (StorageType storageType: unexpectedStorageTypes) {
+      assertFalse(verifyFileReplicasOnStorageType(testFile, numBlocks,
+          storageType));
+    }
+  }
+
+  /**
+   * Verify that writing to SSD and DISK will write to the correct Storage
+   * Types.
+   * @throws IOException
+   */
+  @Test(timeout=60000)
+  public void testTargetStorageTypes() throws ReconfigurationException,
+      InterruptedException, TimeoutException, IOException {
+    // DISK and not anything else.
+    testStorageTypes(new StorageType[][]{
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK}},
+        "ONE_SSD",
+        new StorageType[]{StorageType.SSD, StorageType.DISK},
+        new StorageType[]{StorageType.RAM_DISK, StorageType.ARCHIVE});
+    // only on SSD.
+    testStorageTypes(new StorageType[][]{
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK}},
+        "ALL_SSD",
+        new StorageType[]{StorageType.SSD},
+        new StorageType[]{StorageType.RAM_DISK, StorageType.DISK,
+            StorageType.ARCHIVE});
+    // only on SSD.
+    testStorageTypes(new StorageType[][]{
+            {StorageType.SSD, StorageType.DISK, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK, StorageType.DISK}},
+        "ALL_SSD",
+        new StorageType[]{StorageType.SSD},
+        new StorageType[]{StorageType.RAM_DISK, StorageType.DISK,
+            StorageType.ARCHIVE});
+
+    // DISK and not anything else.
+    testStorageTypes(new StorageType[][] {
+            {StorageType.RAM_DISK, StorageType.SSD},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK}},
+        "HOT",
+        new StorageType[]{StorageType.DISK},
+        new StorageType[] {StorageType.RAM_DISK, StorageType.SSD,
+            StorageType.ARCHIVE});
+
+    testStorageTypes(new StorageType[][] {
+            {StorageType.RAM_DISK, StorageType.SSD},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.ARCHIVE, StorageType.ARCHIVE},
+            {StorageType.ARCHIVE, StorageType.ARCHIVE}},
+        "WARM",
+        new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
+        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD});
+
+    testStorageTypes(new StorageType[][] {
+            {StorageType.RAM_DISK, StorageType.SSD},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.ARCHIVE, StorageType.ARCHIVE},
+            {StorageType.ARCHIVE, StorageType.ARCHIVE}},
+        "COLD",
+        new StorageType[]{StorageType.ARCHIVE},
+        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+            StorageType.DISK});
+
+    // We wait for Lasy Persist to write to disk.
+    testStorageTypes(new StorageType[][] {
+            {StorageType.RAM_DISK, StorageType.SSD},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK}},
+        "LAZY_PERSIST",
+        new StorageType[]{StorageType.DISK},
+        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+            StorageType.ARCHIVE});
+  }
+
+  /**
+   * A VolumeChoosingPolicy test stub used to verify that the storageId passed
+   * in is indeed in the list of volumes.
+   * @param <V>
+   */
+  private static class TestVolumeChoosingPolicy<V extends FsVolumeSpi>
+      extends RoundRobinVolumeChoosingPolicy<V> {
+    static String expectedStorageId;
+
+    @Override
+    public V chooseVolume(List<V> volumes, long replicaSize, String storageId)
+        throws IOException {
+      assertEquals(expectedStorageId, storageId);
+      return super.chooseVolume(volumes, replicaSize, storageId);
+    }
+  }
+
+  private static class TestBlockPlacementPolicy
+      extends BlockPlacementPolicyDefault {
+    static DatanodeStorageInfo[] dnStorageInfosToReturn;
+
+    @Override
+    public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas,
+        Node writer, List<DatanodeStorageInfo> chosenNodes,
+        boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize,
+        final BlockStoragePolicy storagePolicy, EnumSet<AddBlockFlag> flags) {
+      return dnStorageInfosToReturn;
+    }
+  }
+
+  private DatanodeStorageInfo getDatanodeStorageInfo(int dnIndex)
+      throws UnregisteredNodeException {
+    if (cluster == null) {
+      return null;
+    }
+    DatanodeID dnId = cluster.getDataNodes().get(dnIndex).getDatanodeId();
+    DatanodeManager dnManager = cluster.getNamesystem()
+            .getBlockManager().getDatanodeManager();
+    return dnManager.getDatanode(dnId).getStorageInfos()[0];
+  }
+
+  @Test(timeout=60000)
+  public void testStorageIDBlockPlacementSpecific()
+      throws ReconfigurationException, InterruptedException, TimeoutException,
+      IOException {
+    final StorageType[][] storageTypes = {
+        {StorageType.DISK, StorageType.DISK},
+        {StorageType.DISK, StorageType.DISK},
+        {StorageType.DISK, StorageType.DISK},
+        {StorageType.DISK, StorageType.DISK},
+        {StorageType.DISK, StorageType.DISK},
+    };
+    final int numDataNodes = storageTypes.length;
+    final int storagePerDataNode = storageTypes[0].length;
+    startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes,
+        TestVolumeChoosingPolicy.class, TestBlockPlacementPolicy.class);
+    Path testFile = new Path("/test");
+    final short replFactor = 1;
+    final int numBlocks = 10;
+    DatanodeStorageInfo dnInfoToUse = getDatanodeStorageInfo(0);
+    TestBlockPlacementPolicy.dnStorageInfosToReturn =
+        new DatanodeStorageInfo[] {dnInfoToUse};
+    TestVolumeChoosingPolicy.expectedStorageId = dnInfoToUse.getStorageID();
+    //file creation invokes both BlockPlacementPolicy and VolumeChoosingPolicy,
+    //and will test that the storage ids match
+    createFile(testFile, numBlocks, replFactor);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: HDFS-9807. Add an optional StorageID to writes. Contributed by Ewan Higgs

Posted by cd...@apache.org.
HDFS-9807. Add an optional StorageID to writes. Contributed by Ewan Higgs


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a3954cca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a3954cca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a3954cca

Branch: refs/heads/trunk
Commit: a3954ccab148bddc290cb96528e63ff19799bcc9
Parents: 4e6bbd0
Author: Chris Douglas <cd...@apache.org>
Authored: Fri May 5 12:01:26 2017 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri May 5 12:01:26 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DataStreamer.java    |  35 +-
 .../apache/hadoop/hdfs/StripedDataStreamer.java |  10 +-
 .../datatransfer/DataTransferProtocol.java      |  19 +-
 .../hdfs/protocol/datatransfer/Sender.java      |  29 +-
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  13 +
 .../token/block/BlockTokenIdentifier.java       |  36 +-
 .../src/main/proto/datatransfer.proto           |   4 +
 .../src/main/proto/hdfs.proto                   |   1 +
 .../hdfs/protocol/datatransfer/Receiver.java    |  20 +-
 .../block/BlockPoolTokenSecretManager.java      |  22 +-
 .../token/block/BlockTokenSecretManager.java    |  55 ++--
 .../hadoop/hdfs/server/balancer/Dispatcher.java |   5 +-
 .../hadoop/hdfs/server/balancer/KeyManager.java |   4 +-
 .../server/blockmanagement/BlockManager.java    |   6 +-
 .../hdfs/server/datanode/BPOfferService.java    |   3 +-
 .../hdfs/server/datanode/BlockReceiver.java     |  12 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  42 ++-
 .../hdfs/server/datanode/DataXceiver.java       |  68 ++--
 .../erasurecode/ErasureCodingWorker.java        |   3 +-
 .../erasurecode/StripedBlockReader.java         |   2 +-
 .../erasurecode/StripedBlockWriter.java         |  10 +-
 .../erasurecode/StripedReconstructionInfo.java  |  16 +-
 .../datanode/erasurecode/StripedWriter.java     |   5 +-
 .../AvailableSpaceVolumeChoosingPolicy.java     |  20 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java |   6 +-
 .../RoundRobinVolumeChoosingPolicy.java         |   2 +-
 .../fsdataset/VolumeChoosingPolicy.java         |   5 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  21 +-
 .../datanode/fsdataset/impl/FsVolumeList.java   |  19 +-
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |   3 +-
 .../hadoop/hdfs/TestBlockStoragePolicy.java     |  75 ++++-
 .../hadoop/hdfs/TestDataTransferProtocol.java   |   3 +-
 .../hdfs/TestWriteBlockGetsBlockLengthHint.java |   6 +-
 .../security/token/block/TestBlockToken.java    |  98 +++---
 .../server/datanode/BlockReportTestBase.java    |   2 +-
 .../server/datanode/SimulatedFSDataset.java     |  19 +-
 .../hdfs/server/datanode/TestBlockRecovery.java |   6 +-
 .../server/datanode/TestBlockReplacement.java   |   2 +-
 .../TestDataXceiverLazyPersistHint.java         |   4 +-
 .../hdfs/server/datanode/TestDiskError.java     |   5 +-
 .../server/datanode/TestSimulatedFSDataset.java |   4 +-
 .../extdataset/ExternalDatasetImpl.java         |  10 +-
 .../TestAvailableSpaceVolumeChoosingPolicy.java |  76 +++--
 .../TestRoundRobinVolumeChoosingPolicy.java     |  29 +-
 .../fsdataset/impl/TestFsDatasetImpl.java       |   4 +-
 .../fsdataset/impl/TestFsVolumeList.java        |   2 +-
 .../fsdataset/impl/TestWriteToReplica.java      |  29 +-
 .../namenode/TestNamenodeStorageDirectives.java | 330 +++++++++++++++++++
 48 files changed, 903 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 0268537..49c17b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -174,10 +174,12 @@ class DataStreamer extends Daemon {
 
     void sendTransferBlock(final DatanodeInfo[] targets,
         final StorageType[] targetStorageTypes,
+        final String[] targetStorageIDs,
         final Token<BlockTokenIdentifier> blockToken) throws IOException {
       //send the TRANSFER_BLOCK request
       new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
-          dfsClient.clientName, targets, targetStorageTypes);
+          dfsClient.clientName, targets, targetStorageTypes,
+          targetStorageIDs);
       out.flush();
       //ack
       BlockOpResponseProto transferResponse = BlockOpResponseProto
@@ -1367,9 +1369,11 @@ class DataStreamer extends Daemon {
       final DatanodeInfo src = original[tried % original.length];
       final DatanodeInfo[] targets = {nodes[d]};
       final StorageType[] targetStorageTypes = {storageTypes[d]};
+      final String[] targetStorageIDs = {storageIDs[d]};
 
       try {
-        transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+        transfer(src, targets, targetStorageTypes, targetStorageIDs,
+            lb.getBlockToken());
       } catch (IOException ioe) {
         DFSClient.LOG.warn("Error transferring data from " + src + " to " +
             nodes[d] + ": " + ioe.getMessage());
@@ -1400,6 +1404,7 @@ class DataStreamer extends Daemon {
 
   private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
                         final StorageType[] targetStorageTypes,
+                        final String[] targetStorageIDs,
                         final Token<BlockTokenIdentifier> blockToken)
       throws IOException {
     //transfer replica to the new datanode
@@ -1412,7 +1417,8 @@ class DataStreamer extends Daemon {
 
         streams = new StreamerStreams(src, writeTimeout, readTimeout,
             blockToken);
-        streams.sendTransferBlock(targets, targetStorageTypes, blockToken);
+        streams.sendTransferBlock(targets, targetStorageTypes,
+            targetStorageIDs, blockToken);
         return;
       } catch (InvalidEncryptionKeyException e) {
         policy.recordFailure(e);
@@ -1440,11 +1446,12 @@ class DataStreamer extends Daemon {
       streamerClosed = true;
       return;
     }
-    setupPipelineInternal(nodes, storageTypes);
+    setupPipelineInternal(nodes, storageTypes, storageIDs);
   }
 
   protected void setupPipelineInternal(DatanodeInfo[] datanodes,
-      StorageType[] nodeStorageTypes) throws IOException {
+      StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
+      throws IOException {
     boolean success = false;
     long newGS = 0L;
     while (!success && !streamerClosed && dfsClient.clientRunning) {
@@ -1465,7 +1472,8 @@ class DataStreamer extends Daemon {
       accessToken = lb.getBlockToken();
 
       // set up the pipeline again with the remaining nodes
-      success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
+      success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS,
+          isRecovery);
 
       failPacket4Testing();
 
@@ -1601,7 +1609,8 @@ class DataStreamer extends Daemon {
   protected LocatedBlock nextBlockOutputStream() throws IOException {
     LocatedBlock lb;
     DatanodeInfo[] nodes;
-    StorageType[] storageTypes;
+    StorageType[] nextStorageTypes;
+    String[] nextStorageIDs;
     int count = dfsClient.getConf().getNumBlockWriteRetry();
     boolean success;
     final ExtendedBlock oldBlock = block.getCurrentBlock();
@@ -1617,10 +1626,12 @@ class DataStreamer extends Daemon {
       bytesSent = 0;
       accessToken = lb.getBlockToken();
       nodes = lb.getLocations();
-      storageTypes = lb.getStorageTypes();
+      nextStorageTypes = lb.getStorageTypes();
+      nextStorageIDs = lb.getStorageIDs();
 
       // Connect to first DataNode in the list.
-      success = createBlockOutputStream(nodes, storageTypes, 0L, false);
+      success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
+          0L, false);
 
       if (!success) {
         LOG.warn("Abandoning " + block);
@@ -1643,7 +1654,8 @@ class DataStreamer extends Daemon {
   // Returns true if success, otherwise return failure.
   //
   boolean createBlockOutputStream(DatanodeInfo[] nodes,
-      StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
+      StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
+      long newGS, boolean recoveryFlag) {
     if (nodes.length == 0) {
       LOG.info("nodes are empty for write pipeline of " + block);
       return false;
@@ -1696,7 +1708,8 @@ class DataStreamer extends Daemon {
             dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
             nodes.length, block.getNumBytes(), bytesSent, newGS,
             checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
-            (targetPinnings != null && targetPinnings[0]), targetPinnings);
+            (targetPinnings != null && targetPinnings[0]), targetPinnings,
+            nodeStorageIDs[0], nodeStorageIDs);
 
         // receive ack for connect
         BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index b457edb..d920f18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -100,9 +100,11 @@ public class StripedDataStreamer extends DataStreamer {
 
     DatanodeInfo[] nodes = lb.getLocations();
     StorageType[] storageTypes = lb.getStorageTypes();
+    String[] storageIDs = lb.getStorageIDs();
 
     // Connect to the DataNode. If fail the internal error state will be set.
-    success = createBlockOutputStream(nodes, storageTypes, 0L, false);
+    success = createBlockOutputStream(nodes, storageTypes, storageIDs, 0L,
+        false);
 
     if (!success) {
       block.setCurrentBlock(null);
@@ -121,7 +123,8 @@ public class StripedDataStreamer extends DataStreamer {
 
   @Override
   protected void setupPipelineInternal(DatanodeInfo[] nodes,
-      StorageType[] nodeStorageTypes) throws IOException {
+      StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
+      throws IOException {
     boolean success = false;
     while (!success && !streamerClosed() && dfsClient.clientRunning) {
       if (!handleRestartingDatanode()) {
@@ -141,7 +144,8 @@ public class StripedDataStreamer extends DataStreamer {
       // set up the pipeline again with the remaining nodes. when a striped
       // data streamer comes here, it must be in external error state.
       assert getErrorState().hasExternalError();
-      success = createBlockOutputStream(nodes, nodeStorageTypes, newGS, true);
+      success = createBlockOutputStream(nodes, nodeStorageTypes,
+          nodeStorageIDs, newGS, true);
 
       failPacket4Testing();
       getErrorState().checkRestartingNodeDeadline(nodes);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index 6c5883c..fe20c37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -101,6 +101,11 @@ public interface DataTransferProtocol {
    *                         written to disk lazily
    * @param pinning whether to pin the block, so Balancer won't move it.
    * @param targetPinnings whether to pin the block on target datanode
+   * @param storageID optional StorageIDs designating where to write the
+   *                  block. An empty String or null indicates that this
+   *                  has not been provided.
+   * @param targetStorageIDs target StorageIDs corresponding to the target
+   *                         datanodes.
    */
   void writeBlock(final ExtendedBlock blk,
       final StorageType storageType,
@@ -118,7 +123,9 @@ public interface DataTransferProtocol {
       final CachingStrategy cachingStrategy,
       final boolean allowLazyPersist,
       final boolean pinning,
-      final boolean[] targetPinnings) throws IOException;
+      final boolean[] targetPinnings,
+      final String storageID,
+      final String[] targetStorageIDs) throws IOException;
   /**
    * Transfer a block to another datanode.
    * The block stage must be
@@ -129,12 +136,15 @@ public interface DataTransferProtocol {
    * @param blockToken security token for accessing the block.
    * @param clientName client's name.
    * @param targets target datanodes.
+   * @param targetStorageIDs StorageID designating where to write the
+   *                     block.
    */
   void transferBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes) throws IOException;
+      final StorageType[] targetStorageTypes,
+      final String[] targetStorageIDs) throws IOException;
 
   /**
    * Request short circuit access file descriptors from a DataNode.
@@ -179,12 +189,15 @@ public interface DataTransferProtocol {
    * @param blockToken security token for accessing the block.
    * @param delHint the hint for deleting the block in the original datanode.
    * @param source the source datanode for receiving the block.
+   * @param storageId an optional storage ID to designate where the block is
+   *                  replaced to.
    */
   void replaceBlock(final ExtendedBlock blk,
       final StorageType storageType,
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,
-      final DatanodeInfo source) throws IOException;
+      final DatanodeInfo source,
+      final String storageId) throws IOException;
 
   /**
    * Copy a block.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index e133975..8a8d20d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -132,7 +133,9 @@ public class Sender implements DataTransferProtocol {
       final CachingStrategy cachingStrategy,
       final boolean allowLazyPersist,
       final boolean pinning,
-      final boolean[] targetPinnings) throws IOException {
+      final boolean[] targetPinnings,
+      final String storageId,
+      final String[] targetStorageIds) throws IOException {
     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
         blk, clientName, blockToken);
 
@@ -154,11 +157,14 @@ public class Sender implements DataTransferProtocol {
         .setCachingStrategy(getCachingStrategy(cachingStrategy))
         .setAllowLazyPersist(allowLazyPersist)
         .setPinning(pinning)
-        .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1));
-
+        .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1))
+        .addAllTargetStorageIds(PBHelperClient.convert(targetStorageIds, 1));
     if (source != null) {
       proto.setSource(PBHelperClient.convertDatanodeInfo(source));
     }
+    if (storageId != null) {
+      proto.setStorageId(storageId);
+    }
 
     send(out, Op.WRITE_BLOCK, proto.build());
   }
@@ -168,7 +174,8 @@ public class Sender implements DataTransferProtocol {
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes) throws IOException {
+      final StorageType[] targetStorageTypes,
+      final String[] targetStorageIds) throws IOException {
 
     OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
         .setHeader(DataTransferProtoUtil.buildClientHeader(
@@ -176,6 +183,7 @@ public class Sender implements DataTransferProtocol {
         .addAllTargets(PBHelperClient.convert(targets))
         .addAllTargetStorageTypes(
             PBHelperClient.convertStorageTypes(targetStorageTypes))
+        .addAllTargetStorageIds(Arrays.asList(targetStorageIds))
         .build();
 
     send(out, Op.TRANSFER_BLOCK, proto);
@@ -233,15 +241,18 @@ public class Sender implements DataTransferProtocol {
       final StorageType storageType,
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,
-      final DatanodeInfo source) throws IOException {
-    OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
+      final DatanodeInfo source,
+      final String storageId) throws IOException {
+    OpReplaceBlockProto.Builder proto = OpReplaceBlockProto.newBuilder()
         .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
         .setStorageType(PBHelperClient.convertStorageType(storageType))
         .setDelHint(delHint)
-        .setSource(PBHelperClient.convertDatanodeInfo(source))
-        .build();
+        .setSource(PBHelperClient.convertDatanodeInfo(source));
+    if (storageId != null) {
+      proto.setStorageId(storageId);
+    }
 
-    send(out, Op.REPLACE_BLOCK, proto);
+    send(out, Op.REPLACE_BLOCK, proto.build());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 2b8f102..614f653 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -345,6 +345,16 @@ public class PBHelperClient {
     return pinnings;
   }
 
+  public static List<String> convert(String[] targetIds, int idx) {
+    List<String> ids = new ArrayList<>();
+    if (targetIds != null) {
+      for (; idx < targetIds.length; ++idx) {
+        ids.add(targetIds[idx]);
+      }
+    }
+    return ids;
+  }
+
   public static ExtendedBlock convert(ExtendedBlockProto eb) {
     if (eb == null) return null;
     return new ExtendedBlock( eb.getPoolId(), eb.getBlockId(), eb.getNumBytes(),
@@ -640,6 +650,9 @@ public class PBHelperClient {
     for (StorageType storageType : blockTokenSecret.getStorageTypes()) {
       builder.addStorageTypes(convertStorageType(storageType));
     }
+    for (String storageId : blockTokenSecret.getStorageIds()) {
+      builder.addStorageIds(storageId);
+    }
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
index 228a7b6..5950752 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
@@ -53,16 +53,19 @@ public class BlockTokenIdentifier extends TokenIdentifier {
   private long blockId;
   private final EnumSet<AccessMode> modes;
   private StorageType[] storageTypes;
+  private String[] storageIds;
   private boolean useProto;
 
   private byte [] cache;
 
   public BlockTokenIdentifier() {
-    this(null, null, 0, EnumSet.noneOf(AccessMode.class), null, false);
+    this(null, null, 0, EnumSet.noneOf(AccessMode.class), null, null,
+        false);
   }
 
   public BlockTokenIdentifier(String userId, String bpid, long blockId,
-      EnumSet<AccessMode> modes, StorageType[] storageTypes, boolean useProto) {
+      EnumSet<AccessMode> modes, StorageType[] storageTypes,
+      String[] storageIds, boolean useProto) {
     this.cache = null;
     this.userId = userId;
     this.blockPoolId = bpid;
@@ -70,6 +73,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
     this.storageTypes = Optional.ofNullable(storageTypes)
                                 .orElse(StorageType.EMPTY_ARRAY);
+    this.storageIds = Optional.ofNullable(storageIds)
+                              .orElse(new String[0]);
     this.useProto = useProto;
   }
 
@@ -125,6 +130,10 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     return storageTypes;
   }
 
+  public String[] getStorageIds(){
+    return storageIds;
+  }
+
   @Override
   public String toString() {
     return "block_token_identifier (expiryDate=" + this.getExpiryDate()
@@ -132,7 +141,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
         + ", blockPoolId=" + this.getBlockPoolId()
         + ", blockId=" + this.getBlockId() + ", access modes="
         + this.getAccessModes() + ", storageTypes= "
-        + Arrays.toString(this.getStorageTypes()) + ")";
+        + Arrays.toString(this.getStorageTypes()) + ", storageIds= "
+        + Arrays.toString(this.getStorageIds()) + ")";
   }
 
   static boolean isEqual(Object a, Object b) {
@@ -151,7 +161,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
           && isEqual(this.blockPoolId, that.blockPoolId)
           && this.blockId == that.blockId
           && isEqual(this.modes, that.modes)
-          && Arrays.equals(this.storageTypes, that.storageTypes);
+          && Arrays.equals(this.storageTypes, that.storageTypes)
+          && Arrays.equals(this.storageIds, that.storageIds);
     }
     return false;
   }
@@ -161,7 +172,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
         ^ (userId == null ? 0 : userId.hashCode())
         ^ (blockPoolId == null ? 0 : blockPoolId.hashCode())
-        ^ (storageTypes == null ? 0 : Arrays.hashCode(storageTypes));
+        ^ (storageTypes == null ? 0 : Arrays.hashCode(storageTypes))
+        ^ (storageIds == null ? 0 : Arrays.hashCode(storageIds));
   }
 
   /**
@@ -220,6 +232,14 @@ public class BlockTokenIdentifier extends TokenIdentifier {
       readStorageTypes[i] = WritableUtils.readEnum(in, StorageType.class);
     }
     storageTypes = readStorageTypes;
+
+    length = WritableUtils.readVInt(in);
+    String[] readStorageIds = new String[length];
+    for (int i = 0; i < length; i++) {
+      readStorageIds[i] = WritableUtils.readString(in);
+    }
+    storageIds = readStorageIds;
+
     useProto = false;
   }
 
@@ -248,6 +268,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     storageTypes = blockTokenSecretProto.getStorageTypesList().stream()
         .map(PBHelperClient::convertStorageType)
         .toArray(StorageType[]::new);
+    storageIds = blockTokenSecretProto.getStorageIdsList().stream()
+        .toArray(String[]::new);
     useProto = true;
   }
 
@@ -275,6 +297,10 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     for (StorageType type: storageTypes){
       WritableUtils.writeEnum(out, type);
     }
+    WritableUtils.writeVInt(out, storageIds.length);
+    for (String id: storageIds) {
+      WritableUtils.writeString(out, id);
+    }
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 889361a..2356201 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -125,12 +125,15 @@ message OpWriteBlockProto {
   //whether to pin the block, so Balancer won't move it.
   optional bool pinning = 14 [default = false];
   repeated bool targetPinnings = 15;
+  optional string storageId = 16;
+  repeated string targetStorageIds = 17;
 }
   
 message OpTransferBlockProto {
   required ClientOperationHeaderProto header = 1;
   repeated DatanodeInfoProto targets = 2;
   repeated StorageTypeProto targetStorageTypes = 3;
+  repeated string targetStorageIds = 4;
 }
 
 message OpReplaceBlockProto {
@@ -138,6 +141,7 @@ message OpReplaceBlockProto {
   required string delHint = 2;
   required DatanodeInfoProto source = 3;
   optional StorageTypeProto storageType = 4 [default = DISK];
+  optional string storageId = 5;
 }
 
 message OpCopyBlockProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 3e27427..08ed3c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -570,4 +570,5 @@ message BlockTokenSecretProto {
   optional uint64 blockId = 5;
   repeated AccessModeProto modes = 6;
   repeated StorageTypeProto storageTypes = 7;
+  repeated string storageIds = 8;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index 08ab967..bab2e8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -25,7 +25,9 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
@@ -185,7 +187,9 @@ public abstract class Receiver implements DataTransferProtocol {
             CachingStrategy.newDefaultStrategy()),
           (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
           (proto.hasPinning() ? proto.getPinning(): false),
-          (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())));
+          (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())),
+          proto.getStorageId(),
+          proto.getTargetStorageIdsList().toArray(new String[0]));
     } finally {
      if (traceScope != null) traceScope.close();
     }
@@ -199,11 +203,18 @@ public abstract class Receiver implements DataTransferProtocol {
     TraceScope traceScope = continueTraceSpan(proto.getHeader(),
         proto.getClass().getSimpleName());
     try {
-      transferBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
+      final ExtendedBlock block =
+          PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock());
+      final StorageType[] targetStorageTypes =
+          PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(),
+              targets.length);
+      transferBlock(block,
           PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),
           proto.getHeader().getClientName(),
           targets,
-          PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
+          targetStorageTypes,
+          proto.getTargetStorageIdsList().toArray(new String[0])
+      );
     } finally {
       if (traceScope != null) traceScope.close();
     }
@@ -264,7 +275,8 @@ public abstract class Receiver implements DataTransferProtocol {
           PBHelperClient.convertStorageType(proto.getStorageType()),
           PBHelperClient.convert(proto.getHeader().getToken()),
           proto.getDelHint(),
-          PBHelperClient.convert(proto.getSource()));
+          PBHelperClient.convert(proto.getSource()),
+          proto.getStorageId());
     } finally {
       if (traceScope != null) traceScope.close();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
index 29fb73f..8400b4f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
@@ -84,25 +84,27 @@ public class BlockPoolTokenSecretManager extends
   /**
    * See {@link BlockTokenSecretManager#checkAccess(BlockTokenIdentifier,
    *                String, ExtendedBlock, BlockTokenIdentifier.AccessMode,
-   *                StorageType[])}
+   *                StorageType[], String[])}
    */
   public void checkAccess(BlockTokenIdentifier id, String userId,
       ExtendedBlock block, AccessMode mode,
-      StorageType[] storageTypes) throws InvalidToken {
+      StorageType[] storageTypes, String[] storageIds)
+      throws InvalidToken {
     get(block.getBlockPoolId()).checkAccess(id, userId, block, mode,
-        storageTypes);
+        storageTypes, storageIds);
   }
 
   /**
    * See {@link BlockTokenSecretManager#checkAccess(Token, String,
    *                ExtendedBlock, BlockTokenIdentifier.AccessMode,
-   *                StorageType[])}.
+   *                StorageType[], String[])}
    */
   public void checkAccess(Token<BlockTokenIdentifier> token,
       String userId, ExtendedBlock block, AccessMode mode,
-      StorageType[] storageTypes) throws InvalidToken {
+      StorageType[] storageTypes, String[] storageIds)
+      throws InvalidToken {
     get(block.getBlockPoolId()).checkAccess(token, userId, block, mode,
-        storageTypes);
+        storageTypes, storageIds);
   }
 
   /**
@@ -115,11 +117,13 @@ public class BlockPoolTokenSecretManager extends
 
   /**
    * See {@link BlockTokenSecretManager#generateToken(ExtendedBlock, EnumSet,
-   *  StorageType[])}
+   *  StorageType[], String[])}.
    */
   public Token<BlockTokenIdentifier> generateToken(ExtendedBlock b,
-      EnumSet<AccessMode> of, StorageType[] storageTypes) throws IOException {
-    return get(b.getBlockPoolId()).generateToken(b, of, storageTypes);
+      EnumSet<AccessMode> of, StorageType[] storageTypes, String[] storageIds)
+      throws IOException {
+    return get(b.getBlockPoolId()).generateToken(b, of, storageTypes,
+        storageIds);
   }
   
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index f3bec83..6b54490 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -247,18 +247,19 @@ public class BlockTokenSecretManager extends
   /** Generate an block token for current user */
   public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
       EnumSet<BlockTokenIdentifier.AccessMode> modes,
-      StorageType[] storageTypes) throws IOException {
+      StorageType[] storageTypes, String[] storageIds) throws IOException {
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     String userID = (ugi == null ? null : ugi.getShortUserName());
-    return generateToken(userID, block, modes, storageTypes);
+    return generateToken(userID, block, modes, storageTypes, storageIds);
   }
 
   /** Generate a block token for a specified user */
   public Token<BlockTokenIdentifier> generateToken(String userId,
       ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes,
-      StorageType[] storageTypes) throws IOException {
+      StorageType[] storageTypes, String[] storageIds) throws IOException {
     BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
-        .getBlockPoolId(), block.getBlockId(), modes, storageTypes, useProto);
+        .getBlockPoolId(), block.getBlockId(), modes, storageTypes,
+        storageIds, useProto);
     return new Token<BlockTokenIdentifier>(id, this);
   }
 
@@ -272,10 +273,13 @@ public class BlockTokenSecretManager extends
    */
   public void checkAccess(BlockTokenIdentifier id, String userId,
       ExtendedBlock block, BlockTokenIdentifier.AccessMode mode,
-      StorageType[] storageTypes) throws InvalidToken {
+      StorageType[] storageTypes, String[] storageIds) throws InvalidToken {
     checkAccess(id, userId, block, mode);
     if (storageTypes != null && storageTypes.length > 0) {
-      checkAccess(id.getStorageTypes(), storageTypes);
+      checkAccess(id.getStorageTypes(), storageTypes, "StorageTypes");
+    }
+    if (storageIds != null && storageIds.length > 0) {
+      checkAccess(id.getStorageIds(), storageIds, "StorageIDs");
     }
   }
 
@@ -309,30 +313,31 @@ public class BlockTokenSecretManager extends
   }
 
   /**
-   * Check if the requested StorageTypes match the StorageTypes in the
-   * BlockTokenIdentifier.
-   * Empty candidateStorageTypes specifiers mean 'all is permitted'. They
-   * would otherwise be nonsensical.
+   * Check if the requested values can be satisfied with the values in the
+   * BlockToken. This is intended for use with StorageTypes and StorageIDs.
+   *
+   * The current node can only verify that one of the storage [Type|ID] is
+   * available. The rest will be on different nodes.
    */
-  public static void checkAccess(StorageType[] candidateStorageTypes,
-      StorageType[] storageTypesRequested) throws InvalidToken {
-    if (storageTypesRequested.length == 0) {
-      throw new InvalidToken("The request has no StorageTypes. "
+  public static <T> void checkAccess(T[] candidates, T[] requested, String msg)
+      throws InvalidToken {
+    if (requested.length == 0) {
+      throw new InvalidToken("The request has no " + msg + ". "
           + "This is probably a configuration error.");
     }
-    if (candidateStorageTypes.length == 0) {
+    if (candidates.length == 0) {
       return;
     }
 
-    List<StorageType> unseenCandidates = new ArrayList<StorageType>();
-    unseenCandidates.addAll(Arrays.asList(candidateStorageTypes));
-    for (StorageType storageType : storageTypesRequested) {
-      final int index = unseenCandidates.indexOf(storageType);
+    List unseenCandidates = new ArrayList<T>();
+    unseenCandidates.addAll(Arrays.asList(candidates));
+    for (T req : requested) {
+      final int index = unseenCandidates.indexOf(req);
       if (index == -1) {
-        throw new InvalidToken("Block token with StorageTypes "
-            + Arrays.toString(candidateStorageTypes)
-            + " not valid for access with StorageTypes "
-            + Arrays.toString(storageTypesRequested));
+        throw new InvalidToken("Block token with " + msg + " "
+            + Arrays.toString(candidates)
+            + " not valid for access with " + msg + " "
+            + Arrays.toString(requested));
       }
       Collections.swap(unseenCandidates, index, unseenCandidates.size()-1);
       unseenCandidates.remove(unseenCandidates.size()-1);
@@ -342,7 +347,7 @@ public class BlockTokenSecretManager extends
   /** Check if access should be allowed. userID is not checked if null */
   public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
       ExtendedBlock block, BlockTokenIdentifier.AccessMode mode,
-      StorageType[] storageTypes) throws InvalidToken {
+      StorageType[] storageTypes, String[] storageIds) throws InvalidToken {
     BlockTokenIdentifier id = new BlockTokenIdentifier();
     try {
       id.readFields(new DataInputStream(new ByteArrayInputStream(token
@@ -352,7 +357,7 @@ public class BlockTokenSecretManager extends
           "Unable to de-serialize block token identifier for user=" + userId
               + ", block=" + block + ", access mode=" + mode);
     }
-    checkAccess(id, userId, block, mode, storageTypes);
+    checkAccess(id, userId, block, mode, storageTypes, storageIds);
     if (!Arrays.equals(retrievePassword(id), token.getPassword())) {
       throw new InvalidToken("Block token with " + id.toString()
           + " doesn't have the correct token password");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 91dc907..f855e45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -357,7 +357,7 @@ public class Dispatcher {
             reportedBlock.getBlock());
         final KeyManager km = nnc.getKeyManager(); 
         Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb,
-            new StorageType[]{target.storageType});
+            new StorageType[]{target.storageType}, new String[0]);
         IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
             unbufIn, km, accessToken, target.getDatanodeInfo());
         unbufOut = saslStreams.out;
@@ -411,7 +411,8 @@ public class Dispatcher {
     private void sendRequest(DataOutputStream out, ExtendedBlock eb,
         Token<BlockTokenIdentifier> accessToken) throws IOException {
       new Sender(out).replaceBlock(eb, target.storageType, accessToken,
-          source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
+          source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode,
+          null);
     }
 
     /** Check whether to continue waiting for response */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
index 06bf07f..faf95b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
@@ -95,7 +95,7 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory {
 
   /** Get an access token for a block. */
   public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb,
-      StorageType[] storageTypes) throws IOException {
+      StorageType[] storageTypes, String[] storageIds) throws IOException {
     if (!isBlockTokenEnabled) {
       return BlockTokenSecretManager.DUMMY_TOKEN;
     } else {
@@ -105,7 +105,7 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory {
       }
       return blockTokenSecretManager.generateToken(null, eb,
           EnumSet.of(BlockTokenIdentifier.AccessMode.REPLACE,
-              BlockTokenIdentifier.AccessMode.COPY), storageTypes);
+              BlockTokenIdentifier.AccessMode.COPY), storageTypes, storageIds);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index e63930a..8f58e25 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1283,13 +1283,15 @@ public class BlockManager implements BlockStatsMXBean {
           internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]);
           blockTokens[i] = blockTokenSecretManager.generateToken(
               NameNode.getRemoteUser().getShortUserName(),
-              internalBlock, EnumSet.of(mode), b.getStorageTypes());
+              internalBlock, EnumSet.of(mode), b.getStorageTypes(),
+              b.getStorageIDs());
         }
         sb.setBlockTokens(blockTokens);
       } else {
         b.setBlockToken(blockTokenSecretManager.generateToken(
             NameNode.getRemoteUser().getShortUserName(),
-            b.getBlock(), EnumSet.of(mode), b.getStorageTypes()));
+            b.getBlock(), EnumSet.of(mode), b.getStorageTypes(),
+            b.getStorageIDs()));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index e0daca7..042169a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -679,7 +679,8 @@ class BPOfferService {
     case DatanodeProtocol.DNA_TRANSFER:
       // Send a copy of a block to another datanode
       dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
-          bcmd.getTargets(), bcmd.getTargetStorageTypes());
+          bcmd.getTargets(), bcmd.getTargetStorageTypes(),
+          bcmd.getTargetStorageIDs());
       break;
     case DatanodeProtocol.DNA_INVALIDATE:
       //

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 00109e0..2ab4067 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -151,7 +151,8 @@ class BlockReceiver implements Closeable {
       final DataNode datanode, DataChecksum requestedChecksum,
       CachingStrategy cachingStrategy,
       final boolean allowLazyPersist,
-      final boolean pinning) throws IOException {
+      final boolean pinning,
+      final String storageId) throws IOException {
     try{
       this.block = block;
       this.in = in;
@@ -197,6 +198,7 @@ class BlockReceiver implements Closeable {
             + "\n allowLazyPersist=" + allowLazyPersist + ", pinning=" + pinning
             + ", isClient=" + isClient + ", isDatanode=" + isDatanode
             + ", responseInterval=" + responseInterval
+            + ", storageID=" + (storageId != null ? storageId : "null")
         );
       }
 
@@ -204,11 +206,13 @@ class BlockReceiver implements Closeable {
       // Open local disk out
       //
       if (isDatanode) { //replication or move
-        replicaHandler = datanode.data.createTemporary(storageType, block);
+        replicaHandler =
+            datanode.data.createTemporary(storageType, storageId, block);
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
-          replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist);
+          replicaHandler = datanode.data.createRbw(storageType, storageId,
+              block, allowLazyPersist);
           datanode.notifyNamenodeReceivingBlock(
               block, replicaHandler.getReplica().getStorageUuid());
           break;
@@ -233,7 +237,7 @@ class BlockReceiver implements Closeable {
         case TRANSFER_FINALIZED:
           // this is a transfer destination
           replicaHandler =
-              datanode.data.createTemporary(storageType, block);
+              datanode.data.createTemporary(storageType, storageId, block);
           break;
         default: throw new IOException("Unsupported stage " + stage + 
               " while receiving block " + block + " from " + inAddr);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 66ef89a..2305e0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1943,7 +1943,7 @@ public class DataNode extends ReconfigurableBase
         LOG.debug("Got: " + id.toString());
       }
       blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode,
-          null);
+          null, null);
     }
   }
 
@@ -2224,7 +2224,8 @@ public class DataNode extends ReconfigurableBase
 
   @VisibleForTesting
   void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
-      StorageType[] xferTargetStorageTypes) throws IOException {
+      StorageType[] xferTargetStorageTypes, String[] xferTargetStorageIDs)
+      throws IOException {
     BPOfferService bpos = getBPOSForBlock(block);
     DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
 
@@ -2281,17 +2282,19 @@ public class DataNode extends ReconfigurableBase
       LOG.info(bpReg + " Starting thread to transfer " + 
                block + " to " + xfersBuilder);                       
 
-      new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block,
+      new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes,
+          xferTargetStorageIDs, block,
           BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
     }
   }
 
   void transferBlocks(String poolId, Block blocks[],
-      DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) {
+      DatanodeInfo[][] xferTargets, StorageType[][] xferTargetStorageTypes,
+      String[][] xferTargetStorageIDs) {
     for (int i = 0; i < blocks.length; i++) {
       try {
         transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i],
-            xferTargetStorageTypes[i]);
+            xferTargetStorageTypes[i], xferTargetStorageIDs[i]);
       } catch (IOException ie) {
         LOG.warn("Failed to transfer block " + blocks[i], ie);
       }
@@ -2395,6 +2398,7 @@ public class DataNode extends ReconfigurableBase
   private class DataTransfer implements Runnable {
     final DatanodeInfo[] targets;
     final StorageType[] targetStorageTypes;
+    final private String[] targetStorageIds;
     final ExtendedBlock b;
     final BlockConstructionStage stage;
     final private DatanodeRegistration bpReg;
@@ -2406,8 +2410,8 @@ public class DataNode extends ReconfigurableBase
      * entire target list, the block, and the data.
      */
     DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
-        ExtendedBlock b, BlockConstructionStage stage,
-        final String clientname) {
+        String[] targetStorageIds, ExtendedBlock b,
+        BlockConstructionStage stage, final String clientname) {
       if (DataTransferProtocol.LOG.isDebugEnabled()) {
         DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
             + b + " (numBytes=" + b.getNumBytes() + ")"
@@ -2415,10 +2419,13 @@ public class DataNode extends ReconfigurableBase
             + ", clientname=" + clientname
             + ", targets=" + Arrays.asList(targets)
             + ", target storage types=" + (targetStorageTypes == null ? "[]" :
-            Arrays.asList(targetStorageTypes)));
+            Arrays.asList(targetStorageTypes))
+            + ", target storage IDs=" + (targetStorageIds == null ? "[]" :
+            Arrays.asList(targetStorageIds)));
       }
       this.targets = targets;
       this.targetStorageTypes = targetStorageTypes;
+      this.targetStorageIds = targetStorageIds;
       this.b = b;
       this.stage = stage;
       BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
@@ -2456,7 +2463,7 @@ public class DataNode extends ReconfigurableBase
         //
         Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
             EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
-            targetStorageTypes);
+            targetStorageTypes, targetStorageIds);
 
         long writeTimeout = dnConf.socketWriteTimeout + 
                             HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
@@ -2477,10 +2484,13 @@ public class DataNode extends ReconfigurableBase
         DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg)
             .build();
 
+        String storageId = targetStorageIds.length > 0 ?
+            targetStorageIds[0] : null;
         new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
             clientname, targets, targetStorageTypes, srcNode,
             stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
-            false, false, null);
+            false, false, null, storageId,
+            targetStorageIds);
 
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, null);
@@ -2540,12 +2550,12 @@ public class DataNode extends ReconfigurableBase
    */
   public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b,
       EnumSet<AccessMode> mode,
-      StorageType[] storageTypes) throws IOException {
+      StorageType[] storageTypes, String[] storageIds) throws IOException {
     Token<BlockTokenIdentifier> accessToken = 
         BlockTokenSecretManager.DUMMY_TOKEN;
     if (isBlockTokenEnabled) {
       accessToken = blockPoolTokenSecretManager.generateToken(b, mode,
-          storageTypes);
+          storageTypes, storageIds);
     }
     return accessToken;
   }
@@ -2918,7 +2928,7 @@ public class DataNode extends ReconfigurableBase
           LOG.debug("Got: " + id.toString());
         }
         blockPoolTokenSecretManager.checkAccess(id, null, block,
-            BlockTokenIdentifier.AccessMode.READ, null);
+            BlockTokenIdentifier.AccessMode.READ, null, null);
       }
     }
   }
@@ -2934,7 +2944,8 @@ public class DataNode extends ReconfigurableBase
    */
   void transferReplicaForPipelineRecovery(final ExtendedBlock b,
       final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
-      final String client) throws IOException {
+      final String[] targetStorageIds, final String client)
+      throws IOException {
     final long storedGS;
     final long visible;
     final BlockConstructionStage stage;
@@ -2967,7 +2978,8 @@ public class DataNode extends ReconfigurableBase
     b.setNumBytes(visible);
 
     if (targets.length > 0) {
-      new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
+      new DataTransfer(targets, targetStorageTypes, targetStorageIds, b, stage,
+          client).run();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index cc13799..d42e330 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -354,7 +354,8 @@ class DataXceiver extends Receiver implements Runnable {
     updateCurrentThreadName("Passing file descriptors for block " + blk);
     DataOutputStream out = getBufferedOutputStream();
     checkAccess(out, true, blk, token,
-        Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ);
+        Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ,
+        null, null);
     BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
     FileInputStream fis[] = null;
     SlotId registeredSlotId = null;
@@ -662,7 +663,7 @@ class DataXceiver extends Receiver implements Runnable {
       final Token<BlockTokenIdentifier> blockToken,
       final String clientname,
       final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes, 
+      final StorageType[] targetStorageTypes,
       final DatanodeInfo srcDataNode,
       final BlockConstructionStage stage,
       final int pipelineSize,
@@ -673,7 +674,9 @@ class DataXceiver extends Receiver implements Runnable {
       CachingStrategy cachingStrategy,
       boolean allowLazyPersist,
       final boolean pinning,
-      final boolean[] targetPinnings) throws IOException {
+      final boolean[] targetPinnings,
+      final String storageId,
+      final String[] targetStorageIds) throws IOException {
     previousOpClientName = clientname;
     updateCurrentThreadName("Receiving block " + block);
     final boolean isDatanode = clientname.length() == 0;
@@ -692,8 +695,15 @@ class DataXceiver extends Receiver implements Runnable {
     if (targetStorageTypes.length > 0) {
       System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst);
     }
+    int nsi = targetStorageIds.length;
+    String[] storageIds = new String[nsi + 1];
+    storageIds[0] = storageId;
+    if (targetStorageTypes.length > 0) {
+      System.arraycopy(targetStorageIds, 0, storageIds, 1, nsi);
+    }
     checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK,
-        BlockTokenIdentifier.AccessMode.WRITE, storageTypes);
+        BlockTokenIdentifier.AccessMode.WRITE,
+        storageTypes, storageIds);
 
     // check single target for transfer-RBW/Finalized
     if (isTransfer && targets.length > 0) {
@@ -743,7 +753,7 @@ class DataXceiver extends Receiver implements Runnable {
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
-            cachingStrategy, allowLazyPersist, pinning));
+            cachingStrategy, allowLazyPersist, pinning, storageId));
         replica = blockReceiver.getReplica();
       } else {
         replica = datanode.data.recoverClose(
@@ -796,16 +806,18 @@ class DataXceiver extends Receiver implements Runnable {
 
           if (targetPinnings != null && targetPinnings.length > 0) {
             new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
-              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
-              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
-              latestGenerationStamp, requestedChecksum, cachingStrategy,
-                allowLazyPersist, targetPinnings[0], targetPinnings);
+                blockToken, clientname, targets, targetStorageTypes,
+                srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+                latestGenerationStamp, requestedChecksum, cachingStrategy,
+                allowLazyPersist, targetPinnings[0], targetPinnings,
+                targetStorageIds[0], targetStorageIds);
           } else {
             new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
-              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
-              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
-              latestGenerationStamp, requestedChecksum, cachingStrategy,
-                allowLazyPersist, false, targetPinnings);
+                blockToken, clientname, targets, targetStorageTypes,
+                srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+                latestGenerationStamp, requestedChecksum, cachingStrategy,
+                allowLazyPersist, false, targetPinnings,
+                targetStorageIds[0], targetStorageIds);
           }
 
           mirrorOut.flush();
@@ -929,17 +941,19 @@ class DataXceiver extends Receiver implements Runnable {
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes) throws IOException {
+      final StorageType[] targetStorageTypes,
+      final String[] targetStorageIds) throws IOException {
     previousOpClientName = clientName;
     updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
 
     final DataOutputStream out = new DataOutputStream(
         getOutputStream());
     checkAccess(out, true, blk, blockToken, Op.TRANSFER_BLOCK,
-        BlockTokenIdentifier.AccessMode.COPY, targetStorageTypes);
+        BlockTokenIdentifier.AccessMode.COPY, targetStorageTypes,
+        targetStorageIds);
     try {
       datanode.transferReplicaForPipelineRecovery(blk, targets,
-          targetStorageTypes, clientName);
+          targetStorageTypes, targetStorageIds, clientName);
       writeResponse(Status.SUCCESS, null, out);
     } catch (IOException ioe) {
       LOG.info("transferBlock " + blk + " received exception " + ioe);
@@ -1105,12 +1119,14 @@ class DataXceiver extends Receiver implements Runnable {
       final StorageType storageType, 
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,
-      final DatanodeInfo proxySource) throws IOException {
+      final DatanodeInfo proxySource,
+      final String storageId) throws IOException {
     updateCurrentThreadName("Replacing block " + block + " from " + delHint);
     DataOutputStream replyOut = new DataOutputStream(getOutputStream());
     checkAccess(replyOut, true, block, blockToken,
         Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE,
-        new StorageType[]{ storageType });
+        new StorageType[]{storageType},
+        new String[]{storageId});
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       String msg = "Not able to receive block " + block.getBlockId() +
@@ -1131,7 +1147,7 @@ class DataXceiver extends Receiver implements Runnable {
       // Move the block to different storage in the same datanode
       if (proxySource.equals(datanode.getDatanodeId())) {
         ReplicaInfo oldReplica = datanode.data.moveBlockAcrossStorage(block,
-            storageType);
+            storageType, storageId);
         if (oldReplica != null) {
           LOG.info("Moved " + block + " from StorageType "
               + oldReplica.getVolume().getStorageType() + " to " + storageType);
@@ -1188,7 +1204,7 @@ class DataXceiver extends Receiver implements Runnable {
             proxyReply, proxySock.getRemoteSocketAddress().toString(),
             proxySock.getLocalSocketAddress().toString(),
             null, 0, 0, 0, "", null, datanode, remoteChecksum,
-            CachingStrategy.newDropBehind(), false, false));
+            CachingStrategy.newDropBehind(), false, false, storageId));
         
         // receive a block
         blockReceiver.receiveBlock(null, null, replyOut, null, 
@@ -1258,11 +1274,12 @@ class DataXceiver extends Receiver implements Runnable {
       final DataNode dn, DataChecksum requestedChecksum,
       CachingStrategy cachingStrategy,
       final boolean allowLazyPersist,
-      final boolean pinning) throws IOException {
+      final boolean pinning,
+      final String storageId) throws IOException {
     return new BlockReceiver(block, storageType, in,
         inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd,
         clientname, srcDataNode, dn, requestedChecksum,
-        cachingStrategy, allowLazyPersist, pinning);
+        cachingStrategy, allowLazyPersist, pinning, storageId);
   }
 
   /**
@@ -1365,7 +1382,7 @@ class DataXceiver extends Receiver implements Runnable {
   private void checkAccess(OutputStream out, final boolean reply,
       ExtendedBlock blk, Token<BlockTokenIdentifier> t, Op op,
       BlockTokenIdentifier.AccessMode mode) throws IOException {
-    checkAccess(out, reply, blk, t, op, mode, null);
+    checkAccess(out, reply, blk, t, op, mode, null, null);
   }
 
   private void checkAccess(OutputStream out, final boolean reply,
@@ -1373,7 +1390,8 @@ class DataXceiver extends Receiver implements Runnable {
       final Token<BlockTokenIdentifier> t,
       final Op op,
       final BlockTokenIdentifier.AccessMode mode,
-      final StorageType[] storageTypes) throws IOException {
+      final StorageType[] storageTypes,
+      final String[] storageIds) throws IOException {
     checkAndWaitForBP(blk);
     if (datanode.isBlockTokenEnabled) {
       if (LOG.isDebugEnabled()) {
@@ -1382,7 +1400,7 @@ class DataXceiver extends Receiver implements Runnable {
       }
       try {
         datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode,
-            storageTypes);
+            storageTypes, storageIds);
       } catch(InvalidToken e) {
         try {
           if (reply) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 1492e5d..e076dda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -111,7 +111,8 @@ public final class ErasureCodingWorker {
             new StripedReconstructionInfo(
             reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(),
             reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),
-            reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes());
+            reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(),
+            reconInfo.getTargetStorageIDs());
         final StripedBlockReconstructor task =
             new StripedBlockReconstructor(this, stripedReconInfo);
         if (task.hasValidTargets()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
index b3884c2..39ef67e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
@@ -110,7 +110,7 @@ class StripedBlockReader {
           stripedReader.getSocketAddress4Transfer(source);
       Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
           block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ),
-          StorageType.EMPTY_ARRAY);
+          StorageType.EMPTY_ARRAY, new String[0]);
         /*
          * This can be further improved if the replica is local, then we can
          * read directly from DN and need to check the replica is FINALIZED

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
index a6989d4..24c1d61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
@@ -61,6 +61,7 @@ class StripedBlockWriter {
   private final ExtendedBlock block;
   private final DatanodeInfo target;
   private final StorageType storageType;
+  private final String storageId;
 
   private Socket targetSocket;
   private DataOutputStream targetOutputStream;
@@ -72,8 +73,8 @@ class StripedBlockWriter {
 
   StripedBlockWriter(StripedWriter stripedWriter, DataNode datanode,
                      Configuration conf, ExtendedBlock block,
-                     DatanodeInfo target, StorageType storageType)
-      throws IOException {
+                     DatanodeInfo target, StorageType storageType,
+                     String storageId) throws IOException {
     this.stripedWriter = stripedWriter;
     this.datanode = datanode;
     this.conf = conf;
@@ -81,6 +82,7 @@ class StripedBlockWriter {
     this.block = block;
     this.target = target;
     this.storageType = storageType;
+    this.storageId = storageId;
 
     this.targetBuffer = stripedWriter.allocateWriteBuffer();
 
@@ -117,7 +119,7 @@ class StripedBlockWriter {
       Token<BlockTokenIdentifier> blockToken =
           datanode.getBlockAccessToken(block,
               EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
-              new StorageType[]{storageType});
+              new StorageType[]{storageType}, new String[]{storageId});
 
       long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
       OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout);
@@ -141,7 +143,7 @@ class StripedBlockWriter {
           new StorageType[]{storageType}, source,
           BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0,
           stripedWriter.getChecksum(), stripedWriter.getCachingStrategy(),
-          false, false, null);
+          false, false, null, storageId, new String[]{storageId});
 
       targetSocket = socket;
       targetOutputStream = out;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
index a5c328b..a619c34 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
@@ -40,24 +40,27 @@ public class StripedReconstructionInfo {
   private final byte[] targetIndices;
   private final DatanodeInfo[] targets;
   private final StorageType[] targetStorageTypes;
+  private final String[] targetStorageIds;
 
   public StripedReconstructionInfo(ExtendedBlock blockGroup,
       ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
       byte[] targetIndices) {
-    this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null, null);
+    this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null,
+        null, null);
   }
 
   StripedReconstructionInfo(ExtendedBlock blockGroup,
       ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
-      DatanodeInfo[] targets, StorageType[] targetStorageTypes) {
+      DatanodeInfo[] targets, StorageType[] targetStorageTypes,
+      String[] targetStorageIds) {
     this(blockGroup, ecPolicy, liveIndices, sources, null, targets,
-        targetStorageTypes);
+        targetStorageTypes, targetStorageIds);
   }
 
   private StripedReconstructionInfo(ExtendedBlock blockGroup,
       ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
       byte[] targetIndices, DatanodeInfo[] targets,
-      StorageType[] targetStorageTypes) {
+      StorageType[] targetStorageTypes, String[] targetStorageIds) {
 
     this.blockGroup = blockGroup;
     this.ecPolicy = ecPolicy;
@@ -66,6 +69,7 @@ public class StripedReconstructionInfo {
     this.targetIndices = targetIndices;
     this.targets = targets;
     this.targetStorageTypes = targetStorageTypes;
+    this.targetStorageIds = targetStorageIds;
   }
 
   ExtendedBlock getBlockGroup() {
@@ -95,5 +99,9 @@ public class StripedReconstructionInfo {
   StorageType[] getTargetStorageTypes() {
     return targetStorageTypes;
   }
+
+  String[] getTargetStorageIds() {
+    return targetStorageIds;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
index 225a7ed..762506c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
@@ -55,6 +55,7 @@ class StripedWriter {
   private final short[] targetIndices;
   private boolean hasValidTargets;
   private final StorageType[] targetStorageTypes;
+  private final String[] targetStorageIds;
 
   private StripedBlockWriter[] writers;
 
@@ -77,6 +78,8 @@ class StripedWriter {
     assert targets != null;
     this.targetStorageTypes = stripedReconInfo.getTargetStorageTypes();
     assert targetStorageTypes != null;
+    this.targetStorageIds = stripedReconInfo.getTargetStorageIds();
+    assert targetStorageIds != null;
 
     writers = new StripedBlockWriter[targets.length];
 
@@ -192,7 +195,7 @@ class StripedWriter {
   private StripedBlockWriter createWriter(short index) throws IOException {
     return new StripedBlockWriter(this, datanode, conf,
         reconstructor.getBlock(targetIndices[index]), targets[index],
-        targetStorageTypes[index]);
+        targetStorageTypes[index], targetStorageIds[index]);
   }
 
   ByteBuffer allocateWriteBuffer() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
index 39d9547..efe222f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
@@ -113,8 +113,8 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
       new RoundRobinVolumeChoosingPolicy<V>();
 
   @Override
-  public V chooseVolume(List<V> volumes,
-      long replicaSize) throws IOException {
+  public V chooseVolume(List<V> volumes, long replicaSize, String storageId)
+      throws IOException {
     if (volumes.size() < 1) {
       throw new DiskOutOfSpaceException("No more available volumes");
     }
@@ -125,19 +125,20 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
             storageType.ordinal() : StorageType.DEFAULT.ordinal();
 
     synchronized (syncLocks[index]) {
-      return doChooseVolume(volumes, replicaSize);
+      return doChooseVolume(volumes, replicaSize, storageId);
     }
   }
 
-  private V doChooseVolume(final List<V> volumes,
-                         long replicaSize) throws IOException {
+  private V doChooseVolume(final List<V> volumes, long replicaSize,
+      String storageId) throws IOException {
     AvailableSpaceVolumeList volumesWithSpaces =
         new AvailableSpaceVolumeList(volumes);
     
     if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) {
       // If they're actually not too far out of whack, fall back on pure round
       // robin.
-      V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize);
+      V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize,
+          storageId);
       if (LOG.isDebugEnabled()) {
         LOG.debug("All volumes are within the configured free space balance " +
             "threshold. Selecting " + volume + " for write of block size " +
@@ -165,7 +166,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
       if (mostAvailableAmongLowVolumes < replicaSize ||
           random.nextFloat() < scaledPreferencePercent) {
         volume = roundRobinPolicyHighAvailable.chooseVolume(
-            highAvailableVolumes, replicaSize);
+            highAvailableVolumes, replicaSize, storageId);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Volumes are imbalanced. Selecting " + volume +
               " from high available space volumes for write of block size "
@@ -173,7 +174,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
         }
       } else {
         volume = roundRobinPolicyLowAvailable.chooseVolume(
-            lowAvailableVolumes, replicaSize);
+            lowAvailableVolumes, replicaSize, storageId);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Volumes are imbalanced. Selecting " + volume +
               " from low available space volumes for write of block size "
@@ -266,7 +267,8 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
   
   /**
    * Used so that we only check the available space on a given volume once, at
-   * the beginning of {@link AvailableSpaceVolumeChoosingPolicy#chooseVolume(List, long)}.
+   * the beginning of
+   * {@link AvailableSpaceVolumeChoosingPolicy#chooseVolume}.
    */
   private class AvailableSpaceVolumePair {
     private final V volume;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 9e979f7..d7e29cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -318,7 +318,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  ReplicaHandler createTemporary(StorageType storageType,
+  ReplicaHandler createTemporary(StorageType storageType, String storageId,
       ExtendedBlock b) throws IOException;
 
   /**
@@ -328,7 +328,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  ReplicaHandler createRbw(StorageType storageType,
+  ReplicaHandler createRbw(StorageType storageType, String storageId,
       ExtendedBlock b, boolean allowLazyPersist) throws IOException;
 
   /**
@@ -623,7 +623,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
      * Move block from one storage to another storage
      */
    ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
-        StorageType targetStorageType) throws IOException;
+        StorageType targetStorageType, String storageId) throws IOException;
 
   /**
    * Set a block to be pinned on this datanode so that it cannot be moved

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
index 9474b92..b9bcf1f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
@@ -50,7 +50,7 @@ public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
   }
 
   @Override
-  public V chooseVolume(final List<V> volumes, long blockSize)
+  public V chooseVolume(final List<V> volumes, long blockSize, String storageId)
       throws IOException {
 
     if (volumes.size() < 1) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java
index 62b1e75..8cbc058 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java
@@ -36,8 +36,11 @@ public interface VolumeChoosingPolicy<V extends FsVolumeSpi> {
    * 
    * @param volumes - a list of available volumes.
    * @param replicaSize - the size of the replica for which a volume is sought.
+   * @param storageId - the storage id of the Volume nominated by the namenode.
+   *                  This can usually be ignored by the VolumeChoosingPolicy.
    * @return the chosen volume.
    * @throws IOException when disks are unavailable or are full.
    */
-  public V chooseVolume(List<V> volumes, long replicaSize) throws IOException;
+  V chooseVolume(List<V> volumes, long replicaSize, String storageId)
+      throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 169e0e6..9a5002a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -927,7 +927,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override
   public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
-      StorageType targetStorageType) throws IOException {
+      StorageType targetStorageType, String targetStorageId)
+      throws IOException {
     ReplicaInfo replicaInfo = getReplicaInfo(block);
     if (replicaInfo.getState() != ReplicaState.FINALIZED) {
       throw new ReplicaNotFoundException(
@@ -952,7 +953,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     FsVolumeReference volumeRef = null;
     try (AutoCloseableLock lock = datasetLock.acquire()) {
-      volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
+      volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
+          block.getNumBytes());
     }
     try {
       moveBlock(block, replicaInfo, volumeRef);
@@ -1298,11 +1300,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
   }
-  
+
   @Override // FsDatasetSpi
   public ReplicaHandler createRbw(
-      StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
-      throws IOException {
+      StorageType storageType, String storageId, ExtendedBlock b,
+      boolean allowLazyPersist) throws IOException {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getBlockId());
@@ -1335,7 +1337,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
 
       if (ref == null) {
-        ref = volumes.getNextVolume(storageType, b.getNumBytes());
+        ref = volumes.getNextVolume(storageType, storageId, b.getNumBytes());
       }
 
       FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
@@ -1503,7 +1505,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override // FsDatasetSpi
   public ReplicaHandler createTemporary(
-      StorageType storageType, ExtendedBlock b) throws IOException {
+      StorageType storageType, String storageId, ExtendedBlock b)
+      throws IOException {
     long startTimeMs = Time.monotonicNow();
     long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
     ReplicaInfo lastFoundReplicaInfo = null;
@@ -1516,7 +1519,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
           }
           FsVolumeReference ref =
-              volumes.getNextVolume(storageType, b.getNumBytes());
+              volumes.getNextVolume(storageType, storageId, b.getNumBytes());
           FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
           ReplicaInPipeline newReplicaInfo;
           try {
@@ -2899,7 +2902,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                 replicaInfo.getVolume().isTransientStorage()) {
               // Pick a target volume to persist the block.
               targetReference = volumes.getNextVolume(
-                  StorageType.DEFAULT, replicaInfo.getNumBytes());
+                  StorageType.DEFAULT, null, replicaInfo.getNumBytes());
               targetVolume = (FsVolumeImpl) targetReference.getVolume();
 
               ramDiskReplicaTracker.recordStartLazyPersist(


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org