You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/05/11 16:58:46 UTC

[05/50] [abbrv] hadoop git commit: HDFS-9807. Add an optional StorageID to writes. Contributed by Ewan Higgs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index e7f0228..75baf84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -81,10 +81,11 @@ class FsVolumeList {
     return Collections.unmodifiableList(volumes);
   }
 
-  private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize)
-      throws IOException {
+  private FsVolumeReference chooseVolume(List<FsVolumeImpl> list,
+      long blockSize, String storageId) throws IOException {
     while (true) {
-      FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize);
+      FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize,
+          storageId);
       try {
         return volume.obtainReference();
       } catch (ClosedChannelException e) {
@@ -100,18 +101,20 @@ class FsVolumeList {
    * Get next volume.
    *
    * @param blockSize free space needed on the volume
-   * @param storageType the desired {@link StorageType} 
+   * @param storageType the desired {@link StorageType}
+   * @param storageId the storage id which may or may not be used by
+   *                  the VolumeChoosingPolicy.
    * @return next volume to store the block in.
    */
-  FsVolumeReference getNextVolume(StorageType storageType, long blockSize)
-      throws IOException {
+  FsVolumeReference getNextVolume(StorageType storageType, String storageId,
+      long blockSize) throws IOException {
     final List<FsVolumeImpl> list = new ArrayList<>(volumes.size());
     for(FsVolumeImpl v : volumes) {
       if (v.getStorageType() == storageType) {
         list.add(v);
       }
     }
-    return chooseVolume(list, blockSize);
+    return chooseVolume(list, blockSize, storageId);
   }
 
   /**
@@ -129,7 +132,7 @@ class FsVolumeList {
         list.add(v);
       }
     }
-    return chooseVolume(list, blockSize);
+    return chooseVolume(list, blockSize, null);
   }
 
   long getDfsUsed() throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 74cdeae..c98a336 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1018,7 +1018,8 @@ public class DFSTestUtil {
       // send the request
       new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
           dfsClient.clientName, new DatanodeInfo[]{datanodes[1]},
-          new StorageType[]{StorageType.DEFAULT});
+          new StorageType[]{StorageType.DEFAULT},
+          new String[0]);
       out.flush();
 
       return BlockOpResponseProto.parseDelimitedFrom(in);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
index b6884da..3a8fb59 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
@@ -1448,12 +1448,33 @@ public class TestBlockStoragePolicy {
     testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK,
         StorageType.SSD, StorageType.ARCHIVE},
         new StorageType[]{StorageType.DISK}, false);
+
+    testStorageTypeCheckAccessResult(
+        new StorageType[]{StorageType.DISK, StorageType.SSD},
+        new StorageType[]{StorageType.SSD},
+        true);
+
+    testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK},
+        new StorageType[]{StorageType.DISK}, false);
+
+    testStorageTypeCheckAccessResult(
+        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+            StorageType.ARCHIVE},
+        new StorageType[]{StorageType.DISK},
+        false);
+
+    testStorageTypeCheckAccessResult(
+        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+            StorageType.ARCHIVE},
+        new StorageType[]{StorageType.DISK},
+        false);
+
   }
 
   private void testStorageTypeCheckAccessResult(StorageType[] requested,
       StorageType[] allowed, boolean expAccess) {
     try {
-      BlockTokenSecretManager.checkAccess(requested, allowed);
+      BlockTokenSecretManager.checkAccess(requested, allowed, "StorageTypes");
       if (!expAccess) {
         fail("No expected access with allowed StorageTypes "
             + Arrays.toString(allowed) + " and requested StorageTypes "
@@ -1467,4 +1488,56 @@ public class TestBlockStoragePolicy {
       }
     }
   }
+
+  @Test
+  public void testStorageIDCheckAccess() {
+    testStorageIDCheckAccessResult(
+        new String[]{"DN1-Storage1"},
+        new String[]{"DN1-Storage1"}, true);
+
+    testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"},
+        new String[]{"DN1-Storage1"},
+        true);
+
+    testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"},
+        new String[]{"DN1-Storage1", "DN1-Storage2"}, false);
+
+    testStorageIDCheckAccessResult(
+        new String[]{"DN1-Storage1", "DN1-Storage2"},
+        new String[]{"DN1-Storage1"}, true);
+
+    testStorageIDCheckAccessResult(
+        new String[]{"DN1-Storage1", "DN1-Storage2"},
+        new String[]{"DN2-Storage1"}, false);
+
+    testStorageIDCheckAccessResult(
+        new String[]{"DN1-Storage2", "DN2-Storage2"},
+        new String[]{"DN1-Storage1", "DN2-Storage1"}, false);
+
+    testStorageIDCheckAccessResult(new String[0], new String[0], false);
+
+    testStorageIDCheckAccessResult(new String[0], new String[]{"DN1-Storage1"},
+        true);
+
+    testStorageIDCheckAccessResult(new String[]{"DN1-Storage1"}, new String[0],
+        false);
+  }
+
+  private void testStorageIDCheckAccessResult(String[] requested,
+          String[] allowed, boolean expAccess) {
+    try {
+      BlockTokenSecretManager.checkAccess(requested, allowed, "StorageIDs");
+      if (!expAccess) {
+        fail("No expected access with allowed StorageIDs"
+            + Arrays.toString(allowed) + " and requested StorageIDs"
+            + Arrays.toString(requested));
+      }
+    } catch (SecretManager.InvalidToken e) {
+      if (expAccess) {
+        fail("Expected access with allowed StorageIDs "
+            + Arrays.toString(allowed) + " and requested StorageIDs"
+            + Arrays.toString(requested));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index 3f4fe28..7a2ac1b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -559,6 +559,7 @@ public class TestDataTransferProtocol {
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], new StorageType[1], null, stage,
         0, block.getNumBytes(), block.getNumBytes(), newGS,
-        checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
+        checksum, CachingStrategy.newDefaultStrategy(), false, false,
+        null, null, new String[0]);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
index 5c1b38f..e159914 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
@@ -98,11 +98,11 @@ public class TestWriteBlockGetsBlockLengthHint {
      * correctly propagate the hint to FsDatasetSpi.
      */
     @Override
-    public synchronized ReplicaHandler createRbw(
-        StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
+    public synchronized ReplicaHandler createRbw(StorageType storageType,
+        String storageId, ExtendedBlock b, boolean allowLazyPersist)
         throws IOException {
       assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
-      return super.createRbw(storageType, b, allowLazyPersist);
+      return super.createRbw(storageType, storageId, b, allowLazyPersist);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index e98207f..747f295 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -151,7 +151,7 @@ public class TestBlockToken {
         assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
         sm.checkAccess(id, null, PBHelperClient.convert(req.getBlock()),
             BlockTokenIdentifier.AccessMode.WRITE,
-            new StorageType[]{StorageType.DEFAULT});
+            new StorageType[]{StorageType.DEFAULT}, null);
         result = id.getBlockId();
       }
       return GetReplicaVisibleLengthResponseProto.newBuilder()
@@ -160,11 +160,11 @@ public class TestBlockToken {
   }
 
   private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
-                                               ExtendedBlock block,
-                                               EnumSet<BlockTokenIdentifier.AccessMode> accessModes,
-                                               StorageType... storageTypes) throws IOException {
+      ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> accessModes,
+      StorageType[] storageTypes, String[] storageIds)
+      throws IOException {
     Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes,
-        storageTypes);
+        storageTypes, storageIds);
     BlockTokenIdentifier id = sm.createIdentifier();
     id.readFields(new DataInputStream(new ByteArrayInputStream(token
         .getIdentifier())));
@@ -178,29 +178,28 @@ public class TestBlockToken {
         enableProtobuf);
     TestWritable.testWritable(generateTokenId(sm, block3,
         EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
-        StorageType.DEFAULT));
+        new StorageType[]{StorageType.DEFAULT}, null));
     TestWritable.testWritable(generateTokenId(sm, block3,
         EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
-        StorageType.DEFAULT));
+        new StorageType[]{StorageType.DEFAULT}, null));
     TestWritable.testWritable(generateTokenId(sm, block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
-        StorageType.DEFAULT));
+        new StorageType[]{StorageType.DEFAULT}, null));
     TestWritable.testWritable(generateTokenId(sm, block1,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
-        StorageType.DEFAULT));
+        new StorageType[]{StorageType.DEFAULT}, null));
     TestWritable.testWritable(generateTokenId(sm, block2,
         EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
-        StorageType.DEFAULT));
+        new StorageType[]{StorageType.DEFAULT}, null));
     TestWritable.testWritable(generateTokenId(sm, block3,
         EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
-        StorageType.DEFAULT));
+        new StorageType[]{StorageType.DEFAULT}, null));
     // We must be backwards compatible when adding storageType
     TestWritable.testWritable(generateTokenId(sm, block3,
-        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
-        (StorageType[]) null));
+        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), null, null));
     TestWritable.testWritable(generateTokenId(sm, block3,
         EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
-        StorageType.EMPTY_ARRAY));
+        StorageType.EMPTY_ARRAY, null));
   }
 
   @Test
@@ -215,35 +214,36 @@ public class TestBlockToken {
 
   private static void checkAccess(BlockTokenSecretManager m,
       Token<BlockTokenIdentifier> t, ExtendedBlock blk,
-      BlockTokenIdentifier.AccessMode mode) throws SecretManager.InvalidToken {
-    m.checkAccess(t, null, blk, mode, new StorageType[]{ StorageType.DEFAULT });
+      BlockTokenIdentifier.AccessMode mode, StorageType[] storageTypes,
+      String[] storageIds) throws SecretManager.InvalidToken {
+    m.checkAccess(t, null, blk, mode, storageTypes, storageIds);
   }
 
   private void tokenGenerationAndVerification(BlockTokenSecretManager master,
-      BlockTokenSecretManager slave, StorageType... storageTypes)
-      throws Exception {
+      BlockTokenSecretManager slave, StorageType[] storageTypes,
+      String[] storageIds) throws Exception {
     // single-mode tokens
     for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
         .values()) {
       // generated by master
       Token<BlockTokenIdentifier> token1 = master.generateToken(block1,
-          EnumSet.of(mode), storageTypes);
-      checkAccess(master, token1, block1, mode);
-      checkAccess(slave, token1, block1, mode);
+          EnumSet.of(mode), storageTypes, storageIds);
+      checkAccess(master, token1, block1, mode, storageTypes, storageIds);
+      checkAccess(slave, token1, block1, mode, storageTypes, storageIds);
       // generated by slave
       Token<BlockTokenIdentifier> token2 = slave.generateToken(block2,
-          EnumSet.of(mode), storageTypes);
-      checkAccess(master, token2, block2, mode);
-      checkAccess(slave, token2, block2, mode);
+          EnumSet.of(mode), storageTypes, storageIds);
+      checkAccess(master, token2, block2, mode, storageTypes, storageIds);
+      checkAccess(slave, token2, block2, mode, storageTypes, storageIds);
     }
     // multi-mode tokens
     Token<BlockTokenIdentifier> mtoken = master.generateToken(block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
-        storageTypes);
+        storageTypes, storageIds);
     for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
         .values()) {
-      checkAccess(master, mtoken, block3, mode);
-      checkAccess(slave, mtoken, block3, mode);
+      checkAccess(master, mtoken, block3, mode, storageTypes, storageIds);
+      checkAccess(slave, mtoken, block3, mode, storageTypes, storageIds);
     }
   }
 
@@ -259,18 +259,18 @@ public class TestBlockToken {
     ExportedBlockKeys keys = masterHandler.exportKeys();
     slaveHandler.addKeys(keys);
     tokenGenerationAndVerification(masterHandler, slaveHandler,
-        StorageType.DEFAULT);
-    tokenGenerationAndVerification(masterHandler, slaveHandler, null);
+        new StorageType[]{StorageType.DEFAULT}, null);
+    tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
     // key updating
     masterHandler.updateKeys();
     tokenGenerationAndVerification(masterHandler, slaveHandler,
-        StorageType.DEFAULT);
-    tokenGenerationAndVerification(masterHandler, slaveHandler, null);
+        new StorageType[]{StorageType.DEFAULT}, null);
+    tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
     keys = masterHandler.exportKeys();
     slaveHandler.addKeys(keys);
     tokenGenerationAndVerification(masterHandler, slaveHandler,
-        StorageType.DEFAULT);
-    tokenGenerationAndVerification(masterHandler, slaveHandler, null);
+        new StorageType[]{StorageType.DEFAULT}, null);
+    tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
   }
 
   @Test
@@ -315,7 +315,7 @@ public class TestBlockToken {
         enableProtobuf);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
-        new StorageType[]{StorageType.DEFAULT});
+        new StorageType[]{StorageType.DEFAULT}, new String[0]);
 
     final Server server = createMockDatanode(sm, token, conf);
 
@@ -365,7 +365,7 @@ public class TestBlockToken {
         enableProtobuf);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
-        new StorageType[]{StorageType.DEFAULT});
+        new StorageType[]{StorageType.DEFAULT}, new String[0]);
 
     final Server server = createMockDatanode(sm, token, conf);
     server.start();
@@ -451,19 +451,23 @@ public class TestBlockToken {
 
       ExportedBlockKeys keys = masterHandler.exportKeys();
       bpMgr.addKeys(bpid, keys);
+      String[] storageIds = new String[] {"DS-9001"};
       tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
-          StorageType.DEFAULT);
-      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
+          new StorageType[]{StorageType.DEFAULT}, storageIds);
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
+          null);
       // Test key updating
       masterHandler.updateKeys();
       tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
-          StorageType.DEFAULT);
-      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
+          new StorageType[]{StorageType.DEFAULT}, storageIds);
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
+          null);
       keys = masterHandler.exportKeys();
       bpMgr.addKeys(bpid, keys);
       tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
-          StorageType.DEFAULT);
-      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
+          new StorageType[]{StorageType.DEFAULT}, new String[]{"DS-9001"});
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
+          null);
     }
   }
 
@@ -540,7 +544,7 @@ public class TestBlockToken {
         useProto);
     Token<BlockTokenIdentifier> token = sm.generateToken(block1,
         EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
-        new StorageType[]{StorageType.DEFAULT});
+        new StorageType[]{StorageType.DEFAULT}, new String[0]);
     final byte[] tokenBytes = token.getIdentifier();
     BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
     BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@@ -605,7 +609,7 @@ public class TestBlockToken {
         useProto);
     Token<BlockTokenIdentifier> token = sm.generateToken(block1,
         EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
-        StorageType.EMPTY_ARRAY);
+        StorageType.EMPTY_ARRAY, new String[0]);
     final byte[] tokenBytes = token.getIdentifier();
     BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
     BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@@ -699,7 +703,8 @@ public class TestBlockToken {
      */
     BlockTokenIdentifier identifier = new BlockTokenIdentifier("user",
         "blockpool", 123, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
-        new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, true);
+        new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
+        new String[] {"fake-storage-id"}, true);
     Calendar cal = new GregorianCalendar();
     cal.set(2017, 1, 9, 0, 12, 35);
     long datetime = cal.getTimeInMillis();
@@ -749,7 +754,8 @@ public class TestBlockToken {
         new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
             StorageType.DISK, StorageType.ARCHIVE};
     BlockTokenIdentifier ident = new BlockTokenIdentifier("user", "bpool",
-        123, accessModes, storageTypes, useProto);
+        123, accessModes, storageTypes, new String[] {"fake-storage-id"},
+        useProto);
     ident.setExpiryDate(1487080345L);
     BlockTokenIdentifier ret = writeAndReadBlockToken(ident);
     assertEquals(ret.getExpiryDate(), 1487080345L);
@@ -760,6 +766,7 @@ public class TestBlockToken {
     assertEquals(ret.getAccessModes(),
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
     assertArrayEquals(ret.getStorageTypes(), storageTypes);
+    assertArrayEquals(ret.getStorageIds(), new String[] {"fake-storage-id"});
   }
 
   @Test
@@ -767,5 +774,4 @@ public class TestBlockToken {
     testBlockTokenSerialization(false);
     testBlockTokenSerialization(true);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index 6810a0b..c9ff572 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -389,7 +389,7 @@ public abstract class BlockReportTestBase {
     // Create a bogus new block which will not be present on the namenode.
     ExtendedBlock b = new ExtendedBlock(
         poolId, rand.nextLong(), 1024L, rand.nextLong());
-    dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false);
+    dn.getFSDataset().createRbw(StorageType.DEFAULT, null, b, false);
 
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index cd3befd..18b4922 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1023,21 +1023,22 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override // FsDatasetSpi
   public synchronized ReplicaHandler createRbw(
-      StorageType storageType, ExtendedBlock b,
+      StorageType storageType, String storageId, ExtendedBlock b,
       boolean allowLazyPersist) throws IOException {
-    return createTemporary(storageType, b);
+    return createTemporary(storageType, storageId, b);
   }
 
   @Override // FsDatasetSpi
   public synchronized ReplicaHandler createTemporary(
-      StorageType storageType, ExtendedBlock b) throws IOException {
+      StorageType storageType, String storageId, ExtendedBlock b)
+      throws IOException {
     if (isValidBlock(b)) {
-          throw new ReplicaAlreadyExistsException("Block " + b + 
-              " is valid, and cannot be written to.");
-      }
+      throw new ReplicaAlreadyExistsException("Block " + b +
+          " is valid, and cannot be written to.");
+    }
     if (isValidRbw(b)) {
-        throw new ReplicaAlreadyExistsException("Block " + b + 
-            " is being written, and cannot be written to.");
+      throw new ReplicaAlreadyExistsException("Block " + b +
+          " is being written, and cannot be written to.");
     }
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
@@ -1419,7 +1420,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override
   public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
-      StorageType targetStorageType) throws IOException {
+      StorageType targetStorageType, String storageId) throws IOException {
     // TODO Auto-generated method stub
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 579252b..311d5a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -647,7 +647,7 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    dn.data.createRbw(StorageType.DEFAULT, block, false);
+    dn.data.createRbw(StorageType.DEFAULT, null, block, false);
     BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
         recoveryWorker.new RecoveryTaskContiguous(rBlock);
     try {
@@ -673,7 +673,7 @@ public class TestBlockRecovery {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
     ReplicaInPipeline replicaInfo = dn.data.createRbw(
-        StorageType.DEFAULT, block, false).getReplica();
+        StorageType.DEFAULT, null, block, false).getReplica();
     ReplicaOutputStreams streams = null;
     try {
       streams = replicaInfo.createStreams(true,
@@ -972,7 +972,7 @@ public class TestBlockRecovery {
           // Register this thread as the writer for the recoveringBlock.
           LOG.debug("slowWriter creating rbw");
           ReplicaHandler replicaHandler =
-              spyDN.data.createRbw(StorageType.DISK, block, false);
+              spyDN.data.createRbw(StorageType.DISK, null, block, false);
           replicaHandler.close();
           LOG.debug("slowWriter created rbw");
           // Tell the parent thread to start progressing.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
index f811bd8..8992d47 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
@@ -394,7 +394,7 @@ public class TestBlockReplacement {
       DataOutputStream out = new DataOutputStream(sock.getOutputStream());
       new Sender(out).replaceBlock(block, targetStorageType,
           BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
-          sourceProxy);
+          sourceProxy, null);
       out.flush();
       // receiveResponse
       DataInputStream reply = new DataInputStream(sock.getInputStream());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
index b2bfe49..8fda664 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
@@ -129,7 +129,7 @@ public class TestDataXceiverLazyPersistHint {
         DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 0),
         CachingStrategy.newDefaultStrategy(),
         lazyPersist,
-        false, null);
+        false, null, null, new String[0]);
   }
 
   // Helper functions to setup the mock objects.
@@ -151,7 +151,7 @@ public class TestDataXceiverLazyPersistHint {
         any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(),
         anyString(), any(DatanodeInfo.class), any(DataNode.class),
         any(DataChecksum.class), any(CachingStrategy.class),
-        captor.capture(), anyBoolean());
+        captor.capture(), anyBoolean(), any(String.class));
     doReturn(mock(DataOutputStream.class)).when(xceiverSpy)
         .getBufferedOutputStream();
     return xceiverSpy;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index cd86720..38e4287 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -167,7 +167,8 @@ public class TestDiskError {
         BlockTokenSecretManager.DUMMY_TOKEN, "",
         new DatanodeInfo[0], new StorageType[0], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
-        checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
+        checksum, CachingStrategy.newDefaultStrategy(), false, false,
+        null, null, new String[0]);
     out.flush();
 
     // close the connection before sending the content of the block
@@ -274,7 +275,7 @@ public class TestDiskError {
             dn1.getDatanodeId());
 
     dn0.transferBlock(block, new DatanodeInfo[]{dnd1},
-        new StorageType[]{StorageType.DISK});
+        new StorageType[]{StorageType.DISK}, new String[0]);
     // Sleep for 1 second so the DataTrasnfer daemon can start transfer.
     try {
       Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index 4e724bc7..2e69595 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -81,7 +81,7 @@ public class TestSimulatedFSDataset {
       // we pass expected len as zero, - fsdataset should use the sizeof actual
       // data written
       ReplicaInPipeline bInfo = fsdataset.createRbw(
-          StorageType.DEFAULT, b, false).getReplica();
+          StorageType.DEFAULT, null, b, false).getReplica();
       ReplicaOutputStreams out = bInfo.createStreams(true,
           DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
       try {
@@ -368,7 +368,7 @@ public class TestSimulatedFSDataset {
           ExtendedBlock block = new ExtendedBlock(newbpid,1);
           try {
             // it will throw an exception if the block pool is not found
-            fsdataset.createTemporary(StorageType.DEFAULT, block);
+            fsdataset.createTemporary(StorageType.DEFAULT, null, block);
           } catch (IOException ioe) {
             // JUnit does not capture exception in non-main thread,
             // so cache it and then let main thread throw later.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 62ef731..2e439d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -138,14 +138,15 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b)
+  public ReplicaHandler createTemporary(StorageType t, String i,
+      ExtendedBlock b)
       throws IOException {
     return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
   }
 
   @Override
-  public ReplicaHandler createRbw(StorageType t, ExtendedBlock b, boolean tf)
-      throws IOException {
+  public ReplicaHandler createRbw(StorageType storageType, String id,
+      ExtendedBlock b, boolean tf) throws IOException {
     return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
   }
 
@@ -332,7 +333,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, StorageType targetStorageType) throws IOException {
+  public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
+      StorageType targetStorageType, String storageId) throws IOException {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
index 9414a0e..24a43e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
@@ -89,10 +89,12 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
     // than the threshold of 1MB.
     volumes.add(Mockito.mock(FsVolumeSpi.class));
     Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3);
-    
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
   }
   
   @Test(timeout=60000)
@@ -115,21 +117,29 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
     // Third volume, again with 3MB free space.
     volumes.add(Mockito.mock(FsVolumeSpi.class));
     Mockito.when(volumes.get(2).getAvailable()).thenReturn(1024L * 1024L * 3);
-    
+
     // We should alternate assigning between the two volumes with a lot of free
     // space.
     initPolicy(policy, 1.0f);
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+        null));
 
     // All writes should be assigned to the volume with the least free space.
     initPolicy(policy, 0.0f);
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+        null));
   }
   
   @Test(timeout=60000)
@@ -156,22 +166,30 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
     // Fourth volume, again with 3MB free space.
     volumes.add(Mockito.mock(FsVolumeSpi.class));
     Mockito.when(volumes.get(3).getAvailable()).thenReturn(1024L * 1024L * 3);
-    
+
     // We should alternate assigning between the two volumes with a lot of free
     // space.
     initPolicy(policy, 1.0f);
-    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100));
+    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100,
+        null));
 
     // We should alternate assigning between the two volumes with less free
     // space.
     initPolicy(policy, 0.0f);
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+         null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+        null));
   }
   
   @Test(timeout=60000)
@@ -190,13 +208,14 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
     // than the threshold of 1MB.
     volumes.add(Mockito.mock(FsVolumeSpi.class));
     Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3);
-    
+
     // All writes should be assigned to the volume with the least free space.
     // However, if the volume with the least free space doesn't have enough
     // space to accept the replica size, and another volume does have enough
     // free space, that should be chosen instead.
     initPolicy(policy, 0.0f);
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 1024L * 1024L * 2));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes,
+        1024L * 1024L * 2, null));
   }
   
   @Test(timeout=60000)
@@ -220,10 +239,11 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
         .thenReturn(1024L * 1024L * 3)
         .thenReturn(1024L * 1024L * 3)
         .thenReturn(1024L * 1024L * 1); // After the third check, return 1MB.
-    
+
     // Should still be able to get a volume for the replica even though the
     // available space on the second volume changed.
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes,
+        100, null));
   }
   
   @Test(timeout=60000)
@@ -271,12 +291,12 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
       Mockito.when(volume.getAvailable()).thenReturn(1024L * 1024L * 3);
       volumes.add(volume);
     }
-    
+
     initPolicy(policy, preferencePercent);
     long lowAvailableSpaceVolumeSelected = 0;
     long highAvailableSpaceVolumeSelected = 0;
     for (int i = 0; i < RANDOMIZED_ITERATIONS; i++) {
-      FsVolumeSpi volume = policy.chooseVolume(volumes, 100);
+      FsVolumeSpi volume = policy.chooseVolume(volumes, 100, null);
       for (int j = 0; j < volumes.size(); j++) {
         // Note how many times the first low available volume was selected
         if (volume == volumes.get(j) && j == 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
index 9b3047f..44e2a30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
@@ -50,20 +50,21 @@ public class TestRoundRobinVolumeChoosingPolicy {
     // Second volume, with 200 bytes of space.
     volumes.add(Mockito.mock(FsVolumeSpi.class));
     Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
-    
+
     // Test two rounds of round-robin choosing
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null));
 
     // The first volume has only 100L space, so the policy should
     // wisely choose the second one in case we ask for more.
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150,
+        null));
 
     // Fail if no volume can be chosen?
     try {
-      policy.chooseVolume(volumes, Long.MAX_VALUE);
+      policy.chooseVolume(volumes, Long.MAX_VALUE, null);
       Assert.fail();
     } catch (IOException e) {
       // Passed.
@@ -93,7 +94,7 @@ public class TestRoundRobinVolumeChoosingPolicy {
 
     int blockSize = 700;
     try {
-      policy.chooseVolume(volumes, blockSize);
+      policy.chooseVolume(volumes, blockSize, null);
       Assert.fail("expected to throw DiskOutOfSpaceException");
     } catch(DiskOutOfSpaceException e) {
       Assert.assertEquals("Not returnig the expected message",
@@ -137,21 +138,21 @@ public class TestRoundRobinVolumeChoosingPolicy {
     Mockito.when(ssdVolumes.get(1).getAvailable()).thenReturn(100L);
 
     Assert.assertEquals(diskVolumes.get(0),
-            policy.chooseVolume(diskVolumes, 0));
+            policy.chooseVolume(diskVolumes, 0, null));
     // Independent Round-Robin for different storage type
     Assert.assertEquals(ssdVolumes.get(0),
-            policy.chooseVolume(ssdVolumes, 0));
+            policy.chooseVolume(ssdVolumes, 0, null));
     // Take block size into consideration
     Assert.assertEquals(ssdVolumes.get(0),
-            policy.chooseVolume(ssdVolumes, 150L));
+            policy.chooseVolume(ssdVolumes, 150L, null));
 
     Assert.assertEquals(diskVolumes.get(1),
-            policy.chooseVolume(diskVolumes, 0));
+            policy.chooseVolume(diskVolumes, 0, null));
     Assert.assertEquals(diskVolumes.get(0),
-            policy.chooseVolume(diskVolumes, 50L));
+            policy.chooseVolume(diskVolumes, 50L, null));
 
     try {
-      policy.chooseVolume(diskVolumes, 200L);
+      policy.chooseVolume(diskVolumes, 200L, null);
       Assert.fail("Should throw an DiskOutOfSpaceException before this!");
     } catch (DiskOutOfSpaceException e) {
       // Pass.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 905c3f0..3293561 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -259,7 +259,7 @@ public class TestFsDatasetImpl {
       String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
       ExtendedBlock eb = new ExtendedBlock(bpid, i);
       try (ReplicaHandler replica =
-          dataset.createRbw(StorageType.DEFAULT, eb, false)) {
+          dataset.createRbw(StorageType.DEFAULT, null, eb, false)) {
       }
     }
     final String[] dataDirs =
@@ -566,7 +566,7 @@ public class TestFsDatasetImpl {
     class ResponderThread extends Thread {
       public void run() {
         try (ReplicaHandler replica = dataset
-            .createRbw(StorageType.DEFAULT, eb, false)) {
+            .createRbw(StorageType.DEFAULT, null, eb, false)) {
           LOG.info("CreateRbw finished");
           startFinalizeLatch.countDown();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index 83c15ca..ee3a79f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -101,7 +101,7 @@ public class TestFsVolumeList {
     }
     for (int i = 0; i < 10; i++) {
       try (FsVolumeReference ref =
-          volumeList.getNextVolume(StorageType.DEFAULT, 128)) {
+          volumeList.getNextVolume(StorageType.DEFAULT, null, 128)) {
         // volume No.2 will not be chosen.
         assertNotEquals(ref.getVolume(), volumes.get(1));
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index da53cae..11525ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -353,7 +353,7 @@ public class TestWriteToReplica {
     }
  
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false);
+      dataSet.createRbw(StorageType.DEFAULT, null, blocks[FINALIZED], false);
       Assert.fail("Should not have created a replica that's already " +
       		"finalized " + blocks[FINALIZED]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -371,7 +371,7 @@ public class TestWriteToReplica {
     }
 
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false);
+      dataSet.createRbw(StorageType.DEFAULT, null, blocks[TEMPORARY], false);
       Assert.fail("Should not have created a replica that had created as " +
       		"temporary " + blocks[TEMPORARY]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -381,7 +381,7 @@ public class TestWriteToReplica {
         0L, blocks[RBW].getNumBytes());  // expect to be successful
     
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false);
+      dataSet.createRbw(StorageType.DEFAULT, null, blocks[RBW], false);
       Assert.fail("Should not have created a replica that had created as RBW " +
           blocks[RBW]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -397,7 +397,7 @@ public class TestWriteToReplica {
     }
 
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false);
+      dataSet.createRbw(StorageType.DEFAULT, null, blocks[RWR], false);
       Assert.fail("Should not have created a replica that was waiting to be " +
       		"recovered " + blocks[RWR]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -413,7 +413,7 @@ public class TestWriteToReplica {
     }
 
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false);
+      dataSet.createRbw(StorageType.DEFAULT, null, blocks[RUR], false);
       Assert.fail("Should not have created a replica that was under recovery " +
           blocks[RUR]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -430,49 +430,49 @@ public class TestWriteToReplica {
           e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
     }
     
-    dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
+    dataSet.createRbw(StorageType.DEFAULT, null, blocks[NON_EXISTENT], false);
   }
   
   private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED]);
       Assert.fail("Should not have created a temporary replica that was " +
       		"finalized " + blocks[FINALIZED]);
     } catch (ReplicaAlreadyExistsException e) {
     }
  
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY]);
       Assert.fail("Should not have created a replica that had created as" +
       		"temporary " + blocks[TEMPORARY]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW]);
       Assert.fail("Should not have created a replica that had created as RBW " +
           blocks[RBW]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR]);
       Assert.fail("Should not have created a replica that was waiting to be " +
       		"recovered " + blocks[RWR]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR]);
       Assert.fail("Should not have created a replica that was under recovery " +
           blocks[RUR]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
-    dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+    dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
 
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+      dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
       Assert.fail("Should not have created a replica that had already been "
           + "created " + blocks[NON_EXISTENT]);
     } catch (Exception e) {
@@ -485,7 +485,8 @@ public class TestWriteToReplica {
     blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
     try {
       ReplicaInPipeline replicaInfo =
-          dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica();
+          dataSet.createTemporary(StorageType.DEFAULT, null,
+              blocks[NON_EXISTENT]).getReplica();
       Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
       Assert.assertTrue(
           replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java
new file mode 100644
index 0000000..e0f7426
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.server.blockmanagement.*;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.apache.hadoop.net.Node;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test to ensure that the StorageType and StorageID sent from Namenode
+ * to DFSClient are respected.
+ */
+public class TestNamenodeStorageDirectives {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestNamenodeStorageDirectives.class);
+
+  private static final int BLOCK_SIZE = 512;
+
+  private MiniDFSCluster cluster;
+
+  @After
+  public void tearDown() {
+    shutdown();
+  }
+
+  private void startDFSCluster(int numNameNodes, int numDataNodes,
+      int storagePerDataNode, StorageType[][] storageTypes)
+      throws IOException {
+    startDFSCluster(numNameNodes, numDataNodes, storagePerDataNode,
+        storageTypes, RoundRobinVolumeChoosingPolicy.class,
+        BlockPlacementPolicyDefault.class);
+  }
+
+  private void startDFSCluster(int numNameNodes, int numDataNodes,
+      int storagePerDataNode, StorageType[][] storageTypes,
+      Class<? extends VolumeChoosingPolicy> volumeChoosingPolicy,
+      Class<? extends BlockPlacementPolicy> blockPlacementPolicy) throws
+      IOException {
+    shutdown();
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+
+    /*
+     * Lower the DN heartbeat, DF rate, and recheck interval to one second
+     * so state about failures and datanode death propagates faster.
+     */
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+        1000);
+    /* Allow 1 volume failure */
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
+    conf.setClass(
+        DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
+        volumeChoosingPolicy, VolumeChoosingPolicy.class);
+    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        blockPlacementPolicy, BlockPlacementPolicy.class);
+
+    MiniDFSNNTopology nnTopology =
+        MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(nnTopology)
+        .numDataNodes(numDataNodes)
+        .storagesPerDatanode(storagePerDataNode)
+        .storageTypes(storageTypes)
+        .build();
+    cluster.waitActive();
+  }
+
+  private void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private void createFile(Path path, int numBlocks, short replicateFactor)
+      throws IOException, InterruptedException, TimeoutException {
+    createFile(0, path, numBlocks, replicateFactor);
+  }
+
+  private void createFile(int fsIdx, Path path, int numBlocks,
+      short replicateFactor)
+      throws IOException, TimeoutException, InterruptedException {
+    final int seed = 0;
+    final DistributedFileSystem fs = cluster.getFileSystem(fsIdx);
+    DFSTestUtil.createFile(fs, path, BLOCK_SIZE * numBlocks,
+        replicateFactor, seed);
+    DFSTestUtil.waitReplication(fs, path, replicateFactor);
+  }
+
+  private boolean verifyFileReplicasOnStorageType(Path path, int numBlocks,
+      StorageType storageType) throws IOException {
+    MiniDFSCluster.NameNodeInfo info = cluster.getNameNodeInfos()[0];
+    InetSocketAddress addr = info.nameNode.getServiceRpcAddress();
+    assert addr.getPort() != 0;
+    DFSClient client = new DFSClient(addr, cluster.getConfiguration(0));
+
+    FileSystem fs = cluster.getFileSystem();
+
+    if (!fs.exists(path)) {
+      LOG.info("verifyFileReplicasOnStorageType: file {} does not exist", path);
+      return false;
+    }
+    long fileLength = client.getFileInfo(path.toString()).getLen();
+    int foundBlocks = 0;
+    LocatedBlocks locatedBlocks =
+        client.getLocatedBlocks(path.toString(), 0, fileLength);
+    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+      for (StorageType st : locatedBlock.getStorageTypes()) {
+        if (st == storageType) {
+          foundBlocks++;
+        }
+      }
+    }
+
+    LOG.info("Found {}/{} blocks on StorageType {}",
+        foundBlocks, numBlocks, storageType);
+    final boolean isValid = foundBlocks >= numBlocks;
+    return isValid;
+  }
+
+  private void testStorageTypes(StorageType[][] storageTypes,
+      String storagePolicy, StorageType[] expectedStorageTypes,
+      StorageType[] unexpectedStorageTypes) throws ReconfigurationException,
+      InterruptedException, TimeoutException, IOException {
+    final int numDataNodes = storageTypes.length;
+    final int storagePerDataNode = storageTypes[0].length;
+    startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes);
+    cluster.getFileSystem(0).setStoragePolicy(new Path("/"), storagePolicy);
+    Path testFile = new Path("/test");
+    final short replFactor = 2;
+    final int numBlocks = 10;
+    createFile(testFile, numBlocks, replFactor);
+
+    for (StorageType storageType: expectedStorageTypes) {
+      assertTrue(verifyFileReplicasOnStorageType(testFile, numBlocks,
+          storageType));
+    }
+
+    for (StorageType storageType: unexpectedStorageTypes) {
+      assertFalse(verifyFileReplicasOnStorageType(testFile, numBlocks,
+          storageType));
+    }
+  }
+
+  /**
+   * Verify that writing to SSD and DISK will write to the correct Storage
+   * Types.
+   * @throws IOException
+   */
+  @Test(timeout=60000)
+  public void testTargetStorageTypes() throws ReconfigurationException,
+      InterruptedException, TimeoutException, IOException {
+    // DISK and not anything else.
+    testStorageTypes(new StorageType[][]{
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK}},
+        "ONE_SSD",
+        new StorageType[]{StorageType.SSD, StorageType.DISK},
+        new StorageType[]{StorageType.RAM_DISK, StorageType.ARCHIVE});
+    // only on SSD.
+    testStorageTypes(new StorageType[][]{
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK}},
+        "ALL_SSD",
+        new StorageType[]{StorageType.SSD},
+        new StorageType[]{StorageType.RAM_DISK, StorageType.DISK,
+            StorageType.ARCHIVE});
+    // only on SSD.
+    testStorageTypes(new StorageType[][]{
+            {StorageType.SSD, StorageType.DISK, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK, StorageType.DISK}},
+        "ALL_SSD",
+        new StorageType[]{StorageType.SSD},
+        new StorageType[]{StorageType.RAM_DISK, StorageType.DISK,
+            StorageType.ARCHIVE});
+
+    // DISK and not anything else.
+    testStorageTypes(new StorageType[][] {
+            {StorageType.RAM_DISK, StorageType.SSD},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK}},
+        "HOT",
+        new StorageType[]{StorageType.DISK},
+        new StorageType[] {StorageType.RAM_DISK, StorageType.SSD,
+            StorageType.ARCHIVE});
+
+    testStorageTypes(new StorageType[][] {
+            {StorageType.RAM_DISK, StorageType.SSD},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.ARCHIVE, StorageType.ARCHIVE},
+            {StorageType.ARCHIVE, StorageType.ARCHIVE}},
+        "WARM",
+        new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
+        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD});
+
+    testStorageTypes(new StorageType[][] {
+            {StorageType.RAM_DISK, StorageType.SSD},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.ARCHIVE, StorageType.ARCHIVE},
+            {StorageType.ARCHIVE, StorageType.ARCHIVE}},
+        "COLD",
+        new StorageType[]{StorageType.ARCHIVE},
+        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+            StorageType.DISK});
+
+    // We wait for Lasy Persist to write to disk.
+    testStorageTypes(new StorageType[][] {
+            {StorageType.RAM_DISK, StorageType.SSD},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK}},
+        "LAZY_PERSIST",
+        new StorageType[]{StorageType.DISK},
+        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+            StorageType.ARCHIVE});
+  }
+
+  /**
+   * A VolumeChoosingPolicy test stub used to verify that the storageId passed
+   * in is indeed in the list of volumes.
+   * @param <V>
+   */
+  private static class TestVolumeChoosingPolicy<V extends FsVolumeSpi>
+      extends RoundRobinVolumeChoosingPolicy<V> {
+    static String expectedStorageId;
+
+    @Override
+    public V chooseVolume(List<V> volumes, long replicaSize, String storageId)
+        throws IOException {
+      assertEquals(expectedStorageId, storageId);
+      return super.chooseVolume(volumes, replicaSize, storageId);
+    }
+  }
+
+  private static class TestBlockPlacementPolicy
+      extends BlockPlacementPolicyDefault {
+    static DatanodeStorageInfo[] dnStorageInfosToReturn;
+
+    @Override
+    public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas,
+        Node writer, List<DatanodeStorageInfo> chosenNodes,
+        boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize,
+        final BlockStoragePolicy storagePolicy, EnumSet<AddBlockFlag> flags) {
+      return dnStorageInfosToReturn;
+    }
+  }
+
+  private DatanodeStorageInfo getDatanodeStorageInfo(int dnIndex)
+      throws UnregisteredNodeException {
+    if (cluster == null) {
+      return null;
+    }
+    DatanodeID dnId = cluster.getDataNodes().get(dnIndex).getDatanodeId();
+    DatanodeManager dnManager = cluster.getNamesystem()
+            .getBlockManager().getDatanodeManager();
+    return dnManager.getDatanode(dnId).getStorageInfos()[0];
+  }
+
+  @Test(timeout=60000)
+  public void testStorageIDBlockPlacementSpecific()
+      throws ReconfigurationException, InterruptedException, TimeoutException,
+      IOException {
+    final StorageType[][] storageTypes = {
+        {StorageType.DISK, StorageType.DISK},
+        {StorageType.DISK, StorageType.DISK},
+        {StorageType.DISK, StorageType.DISK},
+        {StorageType.DISK, StorageType.DISK},
+        {StorageType.DISK, StorageType.DISK},
+    };
+    final int numDataNodes = storageTypes.length;
+    final int storagePerDataNode = storageTypes[0].length;
+    startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes,
+        TestVolumeChoosingPolicy.class, TestBlockPlacementPolicy.class);
+    Path testFile = new Path("/test");
+    final short replFactor = 1;
+    final int numBlocks = 10;
+    DatanodeStorageInfo dnInfoToUse = getDatanodeStorageInfo(0);
+    TestBlockPlacementPolicy.dnStorageInfosToReturn =
+        new DatanodeStorageInfo[] {dnInfoToUse};
+    TestVolumeChoosingPolicy.expectedStorageId = dnInfoToUse.getStorageID();
+    //file creation invokes both BlockPlacementPolicy and VolumeChoosingPolicy,
+    //and will test that the storage ids match
+    createFile(testFile, numBlocks, replFactor);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org