You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/05/11 16:58:46 UTC
[05/50] [abbrv] hadoop git commit: HDFS-9807. Add an optional
StorageID to writes. Contributed by Ewan Higgs
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index e7f0228..75baf84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -81,10 +81,11 @@ class FsVolumeList {
return Collections.unmodifiableList(volumes);
}
- private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize)
- throws IOException {
+ private FsVolumeReference chooseVolume(List<FsVolumeImpl> list,
+ long blockSize, String storageId) throws IOException {
while (true) {
- FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize);
+ FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize,
+ storageId);
try {
return volume.obtainReference();
} catch (ClosedChannelException e) {
@@ -100,18 +101,20 @@ class FsVolumeList {
* Get next volume.
*
* @param blockSize free space needed on the volume
- * @param storageType the desired {@link StorageType}
+ * @param storageType the desired {@link StorageType}
+ * @param storageId the storage id which may or may not be used by
+ * the VolumeChoosingPolicy.
* @return next volume to store the block in.
*/
- FsVolumeReference getNextVolume(StorageType storageType, long blockSize)
- throws IOException {
+ FsVolumeReference getNextVolume(StorageType storageType, String storageId,
+ long blockSize) throws IOException {
final List<FsVolumeImpl> list = new ArrayList<>(volumes.size());
for(FsVolumeImpl v : volumes) {
if (v.getStorageType() == storageType) {
list.add(v);
}
}
- return chooseVolume(list, blockSize);
+ return chooseVolume(list, blockSize, storageId);
}
/**
@@ -129,7 +132,7 @@ class FsVolumeList {
list.add(v);
}
}
- return chooseVolume(list, blockSize);
+ return chooseVolume(list, blockSize, null);
}
long getDfsUsed() throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 74cdeae..c98a336 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1018,7 +1018,8 @@ public class DFSTestUtil {
// send the request
new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
dfsClient.clientName, new DatanodeInfo[]{datanodes[1]},
- new StorageType[]{StorageType.DEFAULT});
+ new StorageType[]{StorageType.DEFAULT},
+ new String[0]);
out.flush();
return BlockOpResponseProto.parseDelimitedFrom(in);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
index b6884da..3a8fb59 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
@@ -1448,12 +1448,33 @@ public class TestBlockStoragePolicy {
testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK,
StorageType.SSD, StorageType.ARCHIVE},
new StorageType[]{StorageType.DISK}, false);
+
+ testStorageTypeCheckAccessResult(
+ new StorageType[]{StorageType.DISK, StorageType.SSD},
+ new StorageType[]{StorageType.SSD},
+ true);
+
+ testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK},
+ new StorageType[]{StorageType.DISK}, false);
+
+ testStorageTypeCheckAccessResult(
+ new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+ StorageType.ARCHIVE},
+ new StorageType[]{StorageType.DISK},
+ false);
+
+ testStorageTypeCheckAccessResult(
+ new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+ StorageType.ARCHIVE},
+ new StorageType[]{StorageType.DISK},
+ false);
+
}
private void testStorageTypeCheckAccessResult(StorageType[] requested,
StorageType[] allowed, boolean expAccess) {
try {
- BlockTokenSecretManager.checkAccess(requested, allowed);
+ BlockTokenSecretManager.checkAccess(requested, allowed, "StorageTypes");
if (!expAccess) {
fail("No expected access with allowed StorageTypes "
+ Arrays.toString(allowed) + " and requested StorageTypes "
@@ -1467,4 +1488,56 @@ public class TestBlockStoragePolicy {
}
}
}
+
+ @Test
+ public void testStorageIDCheckAccess() {
+ testStorageIDCheckAccessResult(
+ new String[]{"DN1-Storage1"},
+ new String[]{"DN1-Storage1"}, true);
+
+ testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"},
+ new String[]{"DN1-Storage1"},
+ true);
+
+ testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"},
+ new String[]{"DN1-Storage1", "DN1-Storage2"}, false);
+
+ testStorageIDCheckAccessResult(
+ new String[]{"DN1-Storage1", "DN1-Storage2"},
+ new String[]{"DN1-Storage1"}, true);
+
+ testStorageIDCheckAccessResult(
+ new String[]{"DN1-Storage1", "DN1-Storage2"},
+ new String[]{"DN2-Storage1"}, false);
+
+ testStorageIDCheckAccessResult(
+ new String[]{"DN1-Storage2", "DN2-Storage2"},
+ new String[]{"DN1-Storage1", "DN2-Storage1"}, false);
+
+ testStorageIDCheckAccessResult(new String[0], new String[0], false);
+
+ testStorageIDCheckAccessResult(new String[0], new String[]{"DN1-Storage1"},
+ true);
+
+ testStorageIDCheckAccessResult(new String[]{"DN1-Storage1"}, new String[0],
+ false);
+ }
+
+ private void testStorageIDCheckAccessResult(String[] requested,
+ String[] allowed, boolean expAccess) {
+ try {
+ BlockTokenSecretManager.checkAccess(requested, allowed, "StorageIDs");
+ if (!expAccess) {
+ fail("No expected access with allowed StorageIDs"
+ + Arrays.toString(allowed) + " and requested StorageIDs"
+ + Arrays.toString(requested));
+ }
+ } catch (SecretManager.InvalidToken e) {
+ if (expAccess) {
+ fail("Expected access with allowed StorageIDs "
+ + Arrays.toString(allowed) + " and requested StorageIDs"
+ + Arrays.toString(requested));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index 3f4fe28..7a2ac1b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -559,6 +559,7 @@ public class TestDataTransferProtocol {
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], new StorageType[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS,
- checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
+ checksum, CachingStrategy.newDefaultStrategy(), false, false,
+ null, null, new String[0]);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
index 5c1b38f..e159914 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
@@ -98,11 +98,11 @@ public class TestWriteBlockGetsBlockLengthHint {
* correctly propagate the hint to FsDatasetSpi.
*/
@Override
- public synchronized ReplicaHandler createRbw(
- StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
+ public synchronized ReplicaHandler createRbw(StorageType storageType,
+ String storageId, ExtendedBlock b, boolean allowLazyPersist)
throws IOException {
assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
- return super.createRbw(storageType, b, allowLazyPersist);
+ return super.createRbw(storageType, storageId, b, allowLazyPersist);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index e98207f..747f295 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -151,7 +151,7 @@ public class TestBlockToken {
assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
sm.checkAccess(id, null, PBHelperClient.convert(req.getBlock()),
BlockTokenIdentifier.AccessMode.WRITE,
- new StorageType[]{StorageType.DEFAULT});
+ new StorageType[]{StorageType.DEFAULT}, null);
result = id.getBlockId();
}
return GetReplicaVisibleLengthResponseProto.newBuilder()
@@ -160,11 +160,11 @@ public class TestBlockToken {
}
private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
- ExtendedBlock block,
- EnumSet<BlockTokenIdentifier.AccessMode> accessModes,
- StorageType... storageTypes) throws IOException {
+ ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> accessModes,
+ StorageType[] storageTypes, String[] storageIds)
+ throws IOException {
Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes,
- storageTypes);
+ storageTypes, storageIds);
BlockTokenIdentifier id = sm.createIdentifier();
id.readFields(new DataInputStream(new ByteArrayInputStream(token
.getIdentifier())));
@@ -178,29 +178,28 @@ public class TestBlockToken {
enableProtobuf);
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
- StorageType.DEFAULT));
+ new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
- StorageType.DEFAULT));
+ new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
- StorageType.DEFAULT));
+ new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block1,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
- StorageType.DEFAULT));
+ new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block2,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
- StorageType.DEFAULT));
+ new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
- StorageType.DEFAULT));
+ new StorageType[]{StorageType.DEFAULT}, null));
// We must be backwards compatible when adding storageType
TestWritable.testWritable(generateTokenId(sm, block3,
- EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
- (StorageType[]) null));
+ EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), null, null));
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
- StorageType.EMPTY_ARRAY));
+ StorageType.EMPTY_ARRAY, null));
}
@Test
@@ -215,35 +214,36 @@ public class TestBlockToken {
private static void checkAccess(BlockTokenSecretManager m,
Token<BlockTokenIdentifier> t, ExtendedBlock blk,
- BlockTokenIdentifier.AccessMode mode) throws SecretManager.InvalidToken {
- m.checkAccess(t, null, blk, mode, new StorageType[]{ StorageType.DEFAULT });
+ BlockTokenIdentifier.AccessMode mode, StorageType[] storageTypes,
+ String[] storageIds) throws SecretManager.InvalidToken {
+ m.checkAccess(t, null, blk, mode, storageTypes, storageIds);
}
private void tokenGenerationAndVerification(BlockTokenSecretManager master,
- BlockTokenSecretManager slave, StorageType... storageTypes)
- throws Exception {
+ BlockTokenSecretManager slave, StorageType[] storageTypes,
+ String[] storageIds) throws Exception {
// single-mode tokens
for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
.values()) {
// generated by master
Token<BlockTokenIdentifier> token1 = master.generateToken(block1,
- EnumSet.of(mode), storageTypes);
- checkAccess(master, token1, block1, mode);
- checkAccess(slave, token1, block1, mode);
+ EnumSet.of(mode), storageTypes, storageIds);
+ checkAccess(master, token1, block1, mode, storageTypes, storageIds);
+ checkAccess(slave, token1, block1, mode, storageTypes, storageIds);
// generated by slave
Token<BlockTokenIdentifier> token2 = slave.generateToken(block2,
- EnumSet.of(mode), storageTypes);
- checkAccess(master, token2, block2, mode);
- checkAccess(slave, token2, block2, mode);
+ EnumSet.of(mode), storageTypes, storageIds);
+ checkAccess(master, token2, block2, mode, storageTypes, storageIds);
+ checkAccess(slave, token2, block2, mode, storageTypes, storageIds);
}
// multi-mode tokens
Token<BlockTokenIdentifier> mtoken = master.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
- storageTypes);
+ storageTypes, storageIds);
for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
.values()) {
- checkAccess(master, mtoken, block3, mode);
- checkAccess(slave, mtoken, block3, mode);
+ checkAccess(master, mtoken, block3, mode, storageTypes, storageIds);
+ checkAccess(slave, mtoken, block3, mode, storageTypes, storageIds);
}
}
@@ -259,18 +259,18 @@ public class TestBlockToken {
ExportedBlockKeys keys = masterHandler.exportKeys();
slaveHandler.addKeys(keys);
tokenGenerationAndVerification(masterHandler, slaveHandler,
- StorageType.DEFAULT);
- tokenGenerationAndVerification(masterHandler, slaveHandler, null);
+ new StorageType[]{StorageType.DEFAULT}, null);
+ tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
// key updating
masterHandler.updateKeys();
tokenGenerationAndVerification(masterHandler, slaveHandler,
- StorageType.DEFAULT);
- tokenGenerationAndVerification(masterHandler, slaveHandler, null);
+ new StorageType[]{StorageType.DEFAULT}, null);
+ tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
keys = masterHandler.exportKeys();
slaveHandler.addKeys(keys);
tokenGenerationAndVerification(masterHandler, slaveHandler,
- StorageType.DEFAULT);
- tokenGenerationAndVerification(masterHandler, slaveHandler, null);
+ new StorageType[]{StorageType.DEFAULT}, null);
+ tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
}
@Test
@@ -315,7 +315,7 @@ public class TestBlockToken {
enableProtobuf);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
- new StorageType[]{StorageType.DEFAULT});
+ new StorageType[]{StorageType.DEFAULT}, new String[0]);
final Server server = createMockDatanode(sm, token, conf);
@@ -365,7 +365,7 @@ public class TestBlockToken {
enableProtobuf);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
- new StorageType[]{StorageType.DEFAULT});
+ new StorageType[]{StorageType.DEFAULT}, new String[0]);
final Server server = createMockDatanode(sm, token, conf);
server.start();
@@ -451,19 +451,23 @@ public class TestBlockToken {
ExportedBlockKeys keys = masterHandler.exportKeys();
bpMgr.addKeys(bpid, keys);
+ String[] storageIds = new String[] {"DS-9001"};
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
- StorageType.DEFAULT);
- tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
+ new StorageType[]{StorageType.DEFAULT}, storageIds);
+ tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
+ null);
// Test key updating
masterHandler.updateKeys();
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
- StorageType.DEFAULT);
- tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
+ new StorageType[]{StorageType.DEFAULT}, storageIds);
+ tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
+ null);
keys = masterHandler.exportKeys();
bpMgr.addKeys(bpid, keys);
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
- StorageType.DEFAULT);
- tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
+ new StorageType[]{StorageType.DEFAULT}, new String[]{"DS-9001"});
+ tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
+ null);
}
}
@@ -540,7 +544,7 @@ public class TestBlockToken {
useProto);
Token<BlockTokenIdentifier> token = sm.generateToken(block1,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
- new StorageType[]{StorageType.DEFAULT});
+ new StorageType[]{StorageType.DEFAULT}, new String[0]);
final byte[] tokenBytes = token.getIdentifier();
BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@@ -605,7 +609,7 @@ public class TestBlockToken {
useProto);
Token<BlockTokenIdentifier> token = sm.generateToken(block1,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
- StorageType.EMPTY_ARRAY);
+ StorageType.EMPTY_ARRAY, new String[0]);
final byte[] tokenBytes = token.getIdentifier();
BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@@ -699,7 +703,8 @@ public class TestBlockToken {
*/
BlockTokenIdentifier identifier = new BlockTokenIdentifier("user",
"blockpool", 123, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
- new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, true);
+ new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
+ new String[] {"fake-storage-id"}, true);
Calendar cal = new GregorianCalendar();
cal.set(2017, 1, 9, 0, 12, 35);
long datetime = cal.getTimeInMillis();
@@ -749,7 +754,8 @@ public class TestBlockToken {
new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
StorageType.DISK, StorageType.ARCHIVE};
BlockTokenIdentifier ident = new BlockTokenIdentifier("user", "bpool",
- 123, accessModes, storageTypes, useProto);
+ 123, accessModes, storageTypes, new String[] {"fake-storage-id"},
+ useProto);
ident.setExpiryDate(1487080345L);
BlockTokenIdentifier ret = writeAndReadBlockToken(ident);
assertEquals(ret.getExpiryDate(), 1487080345L);
@@ -760,6 +766,7 @@ public class TestBlockToken {
assertEquals(ret.getAccessModes(),
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
assertArrayEquals(ret.getStorageTypes(), storageTypes);
+ assertArrayEquals(ret.getStorageIds(), new String[] {"fake-storage-id"});
}
@Test
@@ -767,5 +774,4 @@ public class TestBlockToken {
testBlockTokenSerialization(false);
testBlockTokenSerialization(true);
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index 6810a0b..c9ff572 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -389,7 +389,7 @@ public abstract class BlockReportTestBase {
// Create a bogus new block which will not be present on the namenode.
ExtendedBlock b = new ExtendedBlock(
poolId, rand.nextLong(), 1024L, rand.nextLong());
- dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false);
+ dn.getFSDataset().createRbw(StorageType.DEFAULT, null, b, false);
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index cd3befd..18b4922 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1023,21 +1023,22 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized ReplicaHandler createRbw(
- StorageType storageType, ExtendedBlock b,
+ StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
- return createTemporary(storageType, b);
+ return createTemporary(storageType, storageId, b);
}
@Override // FsDatasetSpi
public synchronized ReplicaHandler createTemporary(
- StorageType storageType, ExtendedBlock b) throws IOException {
+ StorageType storageType, String storageId, ExtendedBlock b)
+ throws IOException {
if (isValidBlock(b)) {
- throw new ReplicaAlreadyExistsException("Block " + b +
- " is valid, and cannot be written to.");
- }
+ throw new ReplicaAlreadyExistsException("Block " + b +
+ " is valid, and cannot be written to.");
+ }
if (isValidRbw(b)) {
- throw new ReplicaAlreadyExistsException("Block " + b +
- " is being written, and cannot be written to.");
+ throw new ReplicaAlreadyExistsException("Block " + b +
+ " is being written, and cannot be written to.");
}
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
@@ -1419,7 +1420,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
- StorageType targetStorageType) throws IOException {
+ StorageType targetStorageType, String storageId) throws IOException {
// TODO Auto-generated method stub
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 579252b..311d5a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -647,7 +647,7 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
- dn.data.createRbw(StorageType.DEFAULT, block, false);
+ dn.data.createRbw(StorageType.DEFAULT, null, block, false);
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
recoveryWorker.new RecoveryTaskContiguous(rBlock);
try {
@@ -673,7 +673,7 @@ public class TestBlockRecovery {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
ReplicaInPipeline replicaInfo = dn.data.createRbw(
- StorageType.DEFAULT, block, false).getReplica();
+ StorageType.DEFAULT, null, block, false).getReplica();
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,
@@ -972,7 +972,7 @@ public class TestBlockRecovery {
// Register this thread as the writer for the recoveringBlock.
LOG.debug("slowWriter creating rbw");
ReplicaHandler replicaHandler =
- spyDN.data.createRbw(StorageType.DISK, block, false);
+ spyDN.data.createRbw(StorageType.DISK, null, block, false);
replicaHandler.close();
LOG.debug("slowWriter created rbw");
// Tell the parent thread to start progressing.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
index f811bd8..8992d47 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
@@ -394,7 +394,7 @@ public class TestBlockReplacement {
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
new Sender(out).replaceBlock(block, targetStorageType,
BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
- sourceProxy);
+ sourceProxy, null);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
index b2bfe49..8fda664 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
@@ -129,7 +129,7 @@ public class TestDataXceiverLazyPersistHint {
DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 0),
CachingStrategy.newDefaultStrategy(),
lazyPersist,
- false, null);
+ false, null, null, new String[0]);
}
// Helper functions to setup the mock objects.
@@ -151,7 +151,7 @@ public class TestDataXceiverLazyPersistHint {
any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(),
anyString(), any(DatanodeInfo.class), any(DataNode.class),
any(DataChecksum.class), any(CachingStrategy.class),
- captor.capture(), anyBoolean());
+ captor.capture(), anyBoolean(), any(String.class));
doReturn(mock(DataOutputStream.class)).when(xceiverSpy)
.getBufferedOutputStream();
return xceiverSpy;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index cd86720..38e4287 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -167,7 +167,8 @@ public class TestDiskError {
BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], new StorageType[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
- checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
+ checksum, CachingStrategy.newDefaultStrategy(), false, false,
+ null, null, new String[0]);
out.flush();
// close the connection before sending the content of the block
@@ -274,7 +275,7 @@ public class TestDiskError {
dn1.getDatanodeId());
dn0.transferBlock(block, new DatanodeInfo[]{dnd1},
- new StorageType[]{StorageType.DISK});
+ new StorageType[]{StorageType.DISK}, new String[0]);
// Sleep for 1 second so the DataTrasnfer daemon can start transfer.
try {
Thread.sleep(1000);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index 4e724bc7..2e69595 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -81,7 +81,7 @@ public class TestSimulatedFSDataset {
// we pass expected len as zero, - fsdataset should use the sizeof actual
// data written
ReplicaInPipeline bInfo = fsdataset.createRbw(
- StorageType.DEFAULT, b, false).getReplica();
+ StorageType.DEFAULT, null, b, false).getReplica();
ReplicaOutputStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
try {
@@ -368,7 +368,7 @@ public class TestSimulatedFSDataset {
ExtendedBlock block = new ExtendedBlock(newbpid,1);
try {
// it will throw an exception if the block pool is not found
- fsdataset.createTemporary(StorageType.DEFAULT, block);
+ fsdataset.createTemporary(StorageType.DEFAULT, null, block);
} catch (IOException ioe) {
// JUnit does not capture exception in non-main thread,
// so cache it and then let main thread throw later.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 62ef731..2e439d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -138,14 +138,15 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
- public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b)
+ public ReplicaHandler createTemporary(StorageType t, String i,
+ ExtendedBlock b)
throws IOException {
return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
}
@Override
- public ReplicaHandler createRbw(StorageType t, ExtendedBlock b, boolean tf)
- throws IOException {
+ public ReplicaHandler createRbw(StorageType storageType, String id,
+ ExtendedBlock b, boolean tf) throws IOException {
return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
}
@@ -332,7 +333,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
- public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, StorageType targetStorageType) throws IOException {
+ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
+ StorageType targetStorageType, String storageId) throws IOException {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
index 9414a0e..24a43e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
@@ -89,10 +89,12 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
// than the threshold of 1MB.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3);
-
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
}
@Test(timeout=60000)
@@ -115,21 +117,29 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
// Third volume, again with 3MB free space.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(2).getAvailable()).thenReturn(1024L * 1024L * 3);
-
+
// We should alternate assigning between the two volumes with a lot of free
// space.
initPolicy(policy, 1.0f);
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+ null));
// All writes should be assigned to the volume with the least free space.
initPolicy(policy, 0.0f);
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+ null));
}
@Test(timeout=60000)
@@ -156,22 +166,30 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
// Fourth volume, again with 3MB free space.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(3).getAvailable()).thenReturn(1024L * 1024L * 3);
-
+
// We should alternate assigning between the two volumes with a lot of free
// space.
initPolicy(policy, 1.0f);
- Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100));
+ Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100,
+ null));
// We should alternate assigning between the two volumes with less free
// space.
initPolicy(policy, 0.0f);
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
}
@Test(timeout=60000)
@@ -190,13 +208,14 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
// than the threshold of 1MB.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3);
-
+
// All writes should be assigned to the volume with the least free space.
// However, if the volume with the least free space doesn't have enough
// space to accept the replica size, and another volume does have enough
// free space, that should be chosen instead.
initPolicy(policy, 0.0f);
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 1024L * 1024L * 2));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes,
+ 1024L * 1024L * 2, null));
}
@Test(timeout=60000)
@@ -220,10 +239,11 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
.thenReturn(1024L * 1024L * 3)
.thenReturn(1024L * 1024L * 3)
.thenReturn(1024L * 1024L * 1); // After the third check, return 1MB.
-
+
// Should still be able to get a volume for the replica even though the
// available space on the second volume changed.
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes,
+ 100, null));
}
@Test(timeout=60000)
@@ -271,12 +291,12 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
Mockito.when(volume.getAvailable()).thenReturn(1024L * 1024L * 3);
volumes.add(volume);
}
-
+
initPolicy(policy, preferencePercent);
long lowAvailableSpaceVolumeSelected = 0;
long highAvailableSpaceVolumeSelected = 0;
for (int i = 0; i < RANDOMIZED_ITERATIONS; i++) {
- FsVolumeSpi volume = policy.chooseVolume(volumes, 100);
+ FsVolumeSpi volume = policy.chooseVolume(volumes, 100, null);
for (int j = 0; j < volumes.size(); j++) {
// Note how many times the first low available volume was selected
if (volume == volumes.get(j) && j == 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
index 9b3047f..44e2a30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
@@ -50,20 +50,21 @@ public class TestRoundRobinVolumeChoosingPolicy {
// Second volume, with 200 bytes of space.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
-
+
// Test two rounds of round-robin choosing
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null));
// The first volume has only 100L space, so the policy should
// wisely choose the second one in case we ask for more.
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150,
+ null));
// Fail if no volume can be chosen?
try {
- policy.chooseVolume(volumes, Long.MAX_VALUE);
+ policy.chooseVolume(volumes, Long.MAX_VALUE, null);
Assert.fail();
} catch (IOException e) {
// Passed.
@@ -93,7 +94,7 @@ public class TestRoundRobinVolumeChoosingPolicy {
int blockSize = 700;
try {
- policy.chooseVolume(volumes, blockSize);
+ policy.chooseVolume(volumes, blockSize, null);
Assert.fail("expected to throw DiskOutOfSpaceException");
} catch(DiskOutOfSpaceException e) {
Assert.assertEquals("Not returnig the expected message",
@@ -137,21 +138,21 @@ public class TestRoundRobinVolumeChoosingPolicy {
Mockito.when(ssdVolumes.get(1).getAvailable()).thenReturn(100L);
Assert.assertEquals(diskVolumes.get(0),
- policy.chooseVolume(diskVolumes, 0));
+ policy.chooseVolume(diskVolumes, 0, null));
// Independent Round-Robin for different storage type
Assert.assertEquals(ssdVolumes.get(0),
- policy.chooseVolume(ssdVolumes, 0));
+ policy.chooseVolume(ssdVolumes, 0, null));
// Take block size into consideration
Assert.assertEquals(ssdVolumes.get(0),
- policy.chooseVolume(ssdVolumes, 150L));
+ policy.chooseVolume(ssdVolumes, 150L, null));
Assert.assertEquals(diskVolumes.get(1),
- policy.chooseVolume(diskVolumes, 0));
+ policy.chooseVolume(diskVolumes, 0, null));
Assert.assertEquals(diskVolumes.get(0),
- policy.chooseVolume(diskVolumes, 50L));
+ policy.chooseVolume(diskVolumes, 50L, null));
try {
- policy.chooseVolume(diskVolumes, 200L);
+ policy.chooseVolume(diskVolumes, 200L, null);
Assert.fail("Should throw an DiskOutOfSpaceException before this!");
} catch (DiskOutOfSpaceException e) {
// Pass.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 905c3f0..3293561 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -259,7 +259,7 @@ public class TestFsDatasetImpl {
String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
ExtendedBlock eb = new ExtendedBlock(bpid, i);
try (ReplicaHandler replica =
- dataset.createRbw(StorageType.DEFAULT, eb, false)) {
+ dataset.createRbw(StorageType.DEFAULT, null, eb, false)) {
}
}
final String[] dataDirs =
@@ -566,7 +566,7 @@ public class TestFsDatasetImpl {
class ResponderThread extends Thread {
public void run() {
try (ReplicaHandler replica = dataset
- .createRbw(StorageType.DEFAULT, eb, false)) {
+ .createRbw(StorageType.DEFAULT, null, eb, false)) {
LOG.info("CreateRbw finished");
startFinalizeLatch.countDown();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index 83c15ca..ee3a79f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -101,7 +101,7 @@ public class TestFsVolumeList {
}
for (int i = 0; i < 10; i++) {
try (FsVolumeReference ref =
- volumeList.getNextVolume(StorageType.DEFAULT, 128)) {
+ volumeList.getNextVolume(StorageType.DEFAULT, null, 128)) {
// volume No.2 will not be chosen.
assertNotEquals(ref.getVolume(), volumes.get(1));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index da53cae..11525ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -353,7 +353,7 @@ public class TestWriteToReplica {
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false);
+ dataSet.createRbw(StorageType.DEFAULT, null, blocks[FINALIZED], false);
Assert.fail("Should not have created a replica that's already " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
@@ -371,7 +371,7 @@ public class TestWriteToReplica {
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false);
+ dataSet.createRbw(StorageType.DEFAULT, null, blocks[TEMPORARY], false);
Assert.fail("Should not have created a replica that had created as " +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
@@ -381,7 +381,7 @@ public class TestWriteToReplica {
0L, blocks[RBW].getNumBytes()); // expect to be successful
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false);
+ dataSet.createRbw(StorageType.DEFAULT, null, blocks[RBW], false);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
@@ -397,7 +397,7 @@ public class TestWriteToReplica {
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false);
+ dataSet.createRbw(StorageType.DEFAULT, null, blocks[RWR], false);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
@@ -413,7 +413,7 @@ public class TestWriteToReplica {
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false);
+ dataSet.createRbw(StorageType.DEFAULT, null, blocks[RUR], false);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
@@ -430,49 +430,49 @@ public class TestWriteToReplica {
e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
}
- dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
+ dataSet.createRbw(StorageType.DEFAULT, null, blocks[NON_EXISTENT], false);
}
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED]);
Assert.fail("Should not have created a temporary replica that was " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY]);
Assert.fail("Should not have created a replica that had created as" +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW]);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR]);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR]);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
}
- dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
Assert.fail("Should not have created a replica that had already been "
+ "created " + blocks[NON_EXISTENT]);
} catch (Exception e) {
@@ -485,7 +485,8 @@ public class TestWriteToReplica {
blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
try {
ReplicaInPipeline replicaInfo =
- dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica();
+ dataSet.createTemporary(StorageType.DEFAULT, null,
+ blocks[NON_EXISTENT]).getReplica();
Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
Assert.assertTrue(
replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java
new file mode 100644
index 0000000..e0f7426
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.server.blockmanagement.*;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.apache.hadoop.net.Node;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test to ensure that the StorageType and StorageID sent from Namenode
+ * to DFSClient are respected.
+ */
+public class TestNamenodeStorageDirectives {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestNamenodeStorageDirectives.class);
+
+ private static final int BLOCK_SIZE = 512;
+
+ private MiniDFSCluster cluster;
+
+ @After
+ public void tearDown() {
+ shutdown();
+ }
+
+ private void startDFSCluster(int numNameNodes, int numDataNodes,
+ int storagePerDataNode, StorageType[][] storageTypes)
+ throws IOException {
+ startDFSCluster(numNameNodes, numDataNodes, storagePerDataNode,
+ storageTypes, RoundRobinVolumeChoosingPolicy.class,
+ BlockPlacementPolicyDefault.class);
+ }
+
+ private void startDFSCluster(int numNameNodes, int numDataNodes,
+ int storagePerDataNode, StorageType[][] storageTypes,
+ Class<? extends VolumeChoosingPolicy> volumeChoosingPolicy,
+ Class<? extends BlockPlacementPolicy> blockPlacementPolicy) throws
+ IOException {
+ shutdown();
+ Configuration conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+
+ /*
+ * Lower the DN heartbeat, DF rate, and recheck interval to one second
+ * so state about failures and datanode death propagates faster.
+ */
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ 1000);
+ /* Allow 1 volume failure */
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+ conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+ 0, TimeUnit.MILLISECONDS);
+ conf.setClass(
+ DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
+ volumeChoosingPolicy, VolumeChoosingPolicy.class);
+ conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+ blockPlacementPolicy, BlockPlacementPolicy.class);
+
+ MiniDFSNNTopology nnTopology =
+ MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
+
+ cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(nnTopology)
+ .numDataNodes(numDataNodes)
+ .storagesPerDatanode(storagePerDataNode)
+ .storageTypes(storageTypes)
+ .build();
+ cluster.waitActive();
+ }
+
+ private void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ private void createFile(Path path, int numBlocks, short replicateFactor)
+ throws IOException, InterruptedException, TimeoutException {
+ createFile(0, path, numBlocks, replicateFactor);
+ }
+
+ private void createFile(int fsIdx, Path path, int numBlocks,
+ short replicateFactor)
+ throws IOException, TimeoutException, InterruptedException {
+ final int seed = 0;
+ final DistributedFileSystem fs = cluster.getFileSystem(fsIdx);
+ DFSTestUtil.createFile(fs, path, BLOCK_SIZE * numBlocks,
+ replicateFactor, seed);
+ DFSTestUtil.waitReplication(fs, path, replicateFactor);
+ }
+
+ private boolean verifyFileReplicasOnStorageType(Path path, int numBlocks,
+ StorageType storageType) throws IOException {
+ MiniDFSCluster.NameNodeInfo info = cluster.getNameNodeInfos()[0];
+ InetSocketAddress addr = info.nameNode.getServiceRpcAddress();
+ assert addr.getPort() != 0;
+ DFSClient client = new DFSClient(addr, cluster.getConfiguration(0));
+
+ FileSystem fs = cluster.getFileSystem();
+
+ if (!fs.exists(path)) {
+ LOG.info("verifyFileReplicasOnStorageType: file {} does not exist", path);
+ return false;
+ }
+ long fileLength = client.getFileInfo(path.toString()).getLen();
+ int foundBlocks = 0;
+ LocatedBlocks locatedBlocks =
+ client.getLocatedBlocks(path.toString(), 0, fileLength);
+ for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+ for (StorageType st : locatedBlock.getStorageTypes()) {
+ if (st == storageType) {
+ foundBlocks++;
+ }
+ }
+ }
+
+ LOG.info("Found {}/{} blocks on StorageType {}",
+ foundBlocks, numBlocks, storageType);
+ final boolean isValid = foundBlocks >= numBlocks;
+ return isValid;
+ }
+
+ private void testStorageTypes(StorageType[][] storageTypes,
+ String storagePolicy, StorageType[] expectedStorageTypes,
+ StorageType[] unexpectedStorageTypes) throws ReconfigurationException,
+ InterruptedException, TimeoutException, IOException {
+ final int numDataNodes = storageTypes.length;
+ final int storagePerDataNode = storageTypes[0].length;
+ startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes);
+ cluster.getFileSystem(0).setStoragePolicy(new Path("/"), storagePolicy);
+ Path testFile = new Path("/test");
+ final short replFactor = 2;
+ final int numBlocks = 10;
+ createFile(testFile, numBlocks, replFactor);
+
+ for (StorageType storageType: expectedStorageTypes) {
+ assertTrue(verifyFileReplicasOnStorageType(testFile, numBlocks,
+ storageType));
+ }
+
+ for (StorageType storageType: unexpectedStorageTypes) {
+ assertFalse(verifyFileReplicasOnStorageType(testFile, numBlocks,
+ storageType));
+ }
+ }
+
+ /**
+ * Verify that writing to SSD and DISK will write to the correct Storage
+ * Types.
+ * @throws IOException
+ */
+ @Test(timeout=60000)
+ public void testTargetStorageTypes() throws ReconfigurationException,
+ InterruptedException, TimeoutException, IOException {
+ // DISK and not anything else.
+ testStorageTypes(new StorageType[][]{
+ {StorageType.SSD, StorageType.DISK},
+ {StorageType.SSD, StorageType.DISK}},
+ "ONE_SSD",
+ new StorageType[]{StorageType.SSD, StorageType.DISK},
+ new StorageType[]{StorageType.RAM_DISK, StorageType.ARCHIVE});
+ // only on SSD.
+ testStorageTypes(new StorageType[][]{
+ {StorageType.SSD, StorageType.DISK},
+ {StorageType.SSD, StorageType.DISK}},
+ "ALL_SSD",
+ new StorageType[]{StorageType.SSD},
+ new StorageType[]{StorageType.RAM_DISK, StorageType.DISK,
+ StorageType.ARCHIVE});
+ // only on SSD.
+ testStorageTypes(new StorageType[][]{
+ {StorageType.SSD, StorageType.DISK, StorageType.DISK},
+ {StorageType.SSD, StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK, StorageType.DISK}},
+ "ALL_SSD",
+ new StorageType[]{StorageType.SSD},
+ new StorageType[]{StorageType.RAM_DISK, StorageType.DISK,
+ StorageType.ARCHIVE});
+
+ // DISK and not anything else.
+ testStorageTypes(new StorageType[][] {
+ {StorageType.RAM_DISK, StorageType.SSD},
+ {StorageType.SSD, StorageType.DISK},
+ {StorageType.SSD, StorageType.DISK}},
+ "HOT",
+ new StorageType[]{StorageType.DISK},
+ new StorageType[] {StorageType.RAM_DISK, StorageType.SSD,
+ StorageType.ARCHIVE});
+
+ testStorageTypes(new StorageType[][] {
+ {StorageType.RAM_DISK, StorageType.SSD},
+ {StorageType.SSD, StorageType.DISK},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE}},
+ "WARM",
+ new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
+ new StorageType[]{StorageType.RAM_DISK, StorageType.SSD});
+
+ testStorageTypes(new StorageType[][] {
+ {StorageType.RAM_DISK, StorageType.SSD},
+ {StorageType.SSD, StorageType.DISK},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE}},
+ "COLD",
+ new StorageType[]{StorageType.ARCHIVE},
+ new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+ StorageType.DISK});
+
+ // We wait for Lasy Persist to write to disk.
+ testStorageTypes(new StorageType[][] {
+ {StorageType.RAM_DISK, StorageType.SSD},
+ {StorageType.SSD, StorageType.DISK},
+ {StorageType.SSD, StorageType.DISK}},
+ "LAZY_PERSIST",
+ new StorageType[]{StorageType.DISK},
+ new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+ StorageType.ARCHIVE});
+ }
+
+ /**
+ * A VolumeChoosingPolicy test stub used to verify that the storageId passed
+ * in is indeed in the list of volumes.
+ * @param <V>
+ */
+ private static class TestVolumeChoosingPolicy<V extends FsVolumeSpi>
+ extends RoundRobinVolumeChoosingPolicy<V> {
+ static String expectedStorageId;
+
+ @Override
+ public V chooseVolume(List<V> volumes, long replicaSize, String storageId)
+ throws IOException {
+ assertEquals(expectedStorageId, storageId);
+ return super.chooseVolume(volumes, replicaSize, storageId);
+ }
+ }
+
+ private static class TestBlockPlacementPolicy
+ extends BlockPlacementPolicyDefault {
+ static DatanodeStorageInfo[] dnStorageInfosToReturn;
+
+ @Override
+ public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas,
+ Node writer, List<DatanodeStorageInfo> chosenNodes,
+ boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize,
+ final BlockStoragePolicy storagePolicy, EnumSet<AddBlockFlag> flags) {
+ return dnStorageInfosToReturn;
+ }
+ }
+
+ private DatanodeStorageInfo getDatanodeStorageInfo(int dnIndex)
+ throws UnregisteredNodeException {
+ if (cluster == null) {
+ return null;
+ }
+ DatanodeID dnId = cluster.getDataNodes().get(dnIndex).getDatanodeId();
+ DatanodeManager dnManager = cluster.getNamesystem()
+ .getBlockManager().getDatanodeManager();
+ return dnManager.getDatanode(dnId).getStorageInfos()[0];
+ }
+
+ @Test(timeout=60000)
+ public void testStorageIDBlockPlacementSpecific()
+ throws ReconfigurationException, InterruptedException, TimeoutException,
+ IOException {
+ final StorageType[][] storageTypes = {
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ };
+ final int numDataNodes = storageTypes.length;
+ final int storagePerDataNode = storageTypes[0].length;
+ startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes,
+ TestVolumeChoosingPolicy.class, TestBlockPlacementPolicy.class);
+ Path testFile = new Path("/test");
+ final short replFactor = 1;
+ final int numBlocks = 10;
+ DatanodeStorageInfo dnInfoToUse = getDatanodeStorageInfo(0);
+ TestBlockPlacementPolicy.dnStorageInfosToReturn =
+ new DatanodeStorageInfo[] {dnInfoToUse};
+ TestVolumeChoosingPolicy.expectedStorageId = dnInfoToUse.getStorageID();
+ //file creation invokes both BlockPlacementPolicy and VolumeChoosingPolicy,
+ //and will test that the storage ids match
+ createFile(testFile, numBlocks, replFactor);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org