You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2017/05/05 19:05:17 UTC
[1/2] hadoop git commit: HDFS-9807. Add an optional StorageID to
writes. Contributed by Ewan Higgs
Repository: hadoop
Updated Branches:
refs/heads/trunk 4e6bbd049 -> a3954ccab
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index e7f0228..75baf84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -81,10 +81,11 @@ class FsVolumeList {
return Collections.unmodifiableList(volumes);
}
- private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize)
- throws IOException {
+ private FsVolumeReference chooseVolume(List<FsVolumeImpl> list,
+ long blockSize, String storageId) throws IOException {
while (true) {
- FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize);
+ FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize,
+ storageId);
try {
return volume.obtainReference();
} catch (ClosedChannelException e) {
@@ -100,18 +101,20 @@ class FsVolumeList {
* Get next volume.
*
* @param blockSize free space needed on the volume
- * @param storageType the desired {@link StorageType}
+ * @param storageType the desired {@link StorageType}
+ * @param storageId the storage id which may or may not be used by
+ * the VolumeChoosingPolicy.
* @return next volume to store the block in.
*/
- FsVolumeReference getNextVolume(StorageType storageType, long blockSize)
- throws IOException {
+ FsVolumeReference getNextVolume(StorageType storageType, String storageId,
+ long blockSize) throws IOException {
final List<FsVolumeImpl> list = new ArrayList<>(volumes.size());
for(FsVolumeImpl v : volumes) {
if (v.getStorageType() == storageType) {
list.add(v);
}
}
- return chooseVolume(list, blockSize);
+ return chooseVolume(list, blockSize, storageId);
}
/**
@@ -129,7 +132,7 @@ class FsVolumeList {
list.add(v);
}
}
- return chooseVolume(list, blockSize);
+ return chooseVolume(list, blockSize, null);
}
long getDfsUsed() throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 74cdeae..c98a336 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1018,7 +1018,8 @@ public class DFSTestUtil {
// send the request
new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
dfsClient.clientName, new DatanodeInfo[]{datanodes[1]},
- new StorageType[]{StorageType.DEFAULT});
+ new StorageType[]{StorageType.DEFAULT},
+ new String[0]);
out.flush();
return BlockOpResponseProto.parseDelimitedFrom(in);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
index b6884da..3a8fb59 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
@@ -1448,12 +1448,33 @@ public class TestBlockStoragePolicy {
testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK,
StorageType.SSD, StorageType.ARCHIVE},
new StorageType[]{StorageType.DISK}, false);
+
+ testStorageTypeCheckAccessResult(
+ new StorageType[]{StorageType.DISK, StorageType.SSD},
+ new StorageType[]{StorageType.SSD},
+ true);
+
+ testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK},
+ new StorageType[]{StorageType.DISK}, false);
+
+ testStorageTypeCheckAccessResult(
+ new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+ StorageType.ARCHIVE},
+ new StorageType[]{StorageType.DISK},
+ false);
+
+ testStorageTypeCheckAccessResult(
+ new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+ StorageType.ARCHIVE},
+ new StorageType[]{StorageType.DISK},
+ false);
+
}
private void testStorageTypeCheckAccessResult(StorageType[] requested,
StorageType[] allowed, boolean expAccess) {
try {
- BlockTokenSecretManager.checkAccess(requested, allowed);
+ BlockTokenSecretManager.checkAccess(requested, allowed, "StorageTypes");
if (!expAccess) {
fail("No expected access with allowed StorageTypes "
+ Arrays.toString(allowed) + " and requested StorageTypes "
@@ -1467,4 +1488,56 @@ public class TestBlockStoragePolicy {
}
}
}
+
+ @Test
+ public void testStorageIDCheckAccess() {
+ testStorageIDCheckAccessResult(
+ new String[]{"DN1-Storage1"},
+ new String[]{"DN1-Storage1"}, true);
+
+ testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"},
+ new String[]{"DN1-Storage1"},
+ true);
+
+ testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"},
+ new String[]{"DN1-Storage1", "DN1-Storage2"}, false);
+
+ testStorageIDCheckAccessResult(
+ new String[]{"DN1-Storage1", "DN1-Storage2"},
+ new String[]{"DN1-Storage1"}, true);
+
+ testStorageIDCheckAccessResult(
+ new String[]{"DN1-Storage1", "DN1-Storage2"},
+ new String[]{"DN2-Storage1"}, false);
+
+ testStorageIDCheckAccessResult(
+ new String[]{"DN1-Storage2", "DN2-Storage2"},
+ new String[]{"DN1-Storage1", "DN2-Storage1"}, false);
+
+ testStorageIDCheckAccessResult(new String[0], new String[0], false);
+
+ testStorageIDCheckAccessResult(new String[0], new String[]{"DN1-Storage1"},
+ true);
+
+ testStorageIDCheckAccessResult(new String[]{"DN1-Storage1"}, new String[0],
+ false);
+ }
+
+ private void testStorageIDCheckAccessResult(String[] requested,
+ String[] allowed, boolean expAccess) {
+ try {
+ BlockTokenSecretManager.checkAccess(requested, allowed, "StorageIDs");
+ if (!expAccess) {
+ fail("No expected access with allowed StorageIDs"
+ + Arrays.toString(allowed) + " and requested StorageIDs"
+ + Arrays.toString(requested));
+ }
+ } catch (SecretManager.InvalidToken e) {
+ if (expAccess) {
+ fail("Expected access with allowed StorageIDs "
+ + Arrays.toString(allowed) + " and requested StorageIDs"
+ + Arrays.toString(requested));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index 3f4fe28..7a2ac1b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -559,6 +559,7 @@ public class TestDataTransferProtocol {
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], new StorageType[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS,
- checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
+ checksum, CachingStrategy.newDefaultStrategy(), false, false,
+ null, null, new String[0]);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
index 5c1b38f..e159914 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
@@ -98,11 +98,11 @@ public class TestWriteBlockGetsBlockLengthHint {
* correctly propagate the hint to FsDatasetSpi.
*/
@Override
- public synchronized ReplicaHandler createRbw(
- StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
+ public synchronized ReplicaHandler createRbw(StorageType storageType,
+ String storageId, ExtendedBlock b, boolean allowLazyPersist)
throws IOException {
assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
- return super.createRbw(storageType, b, allowLazyPersist);
+ return super.createRbw(storageType, storageId, b, allowLazyPersist);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index e98207f..747f295 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -151,7 +151,7 @@ public class TestBlockToken {
assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
sm.checkAccess(id, null, PBHelperClient.convert(req.getBlock()),
BlockTokenIdentifier.AccessMode.WRITE,
- new StorageType[]{StorageType.DEFAULT});
+ new StorageType[]{StorageType.DEFAULT}, null);
result = id.getBlockId();
}
return GetReplicaVisibleLengthResponseProto.newBuilder()
@@ -160,11 +160,11 @@ public class TestBlockToken {
}
private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
- ExtendedBlock block,
- EnumSet<BlockTokenIdentifier.AccessMode> accessModes,
- StorageType... storageTypes) throws IOException {
+ ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> accessModes,
+ StorageType[] storageTypes, String[] storageIds)
+ throws IOException {
Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes,
- storageTypes);
+ storageTypes, storageIds);
BlockTokenIdentifier id = sm.createIdentifier();
id.readFields(new DataInputStream(new ByteArrayInputStream(token
.getIdentifier())));
@@ -178,29 +178,28 @@ public class TestBlockToken {
enableProtobuf);
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
- StorageType.DEFAULT));
+ new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
- StorageType.DEFAULT));
+ new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
- StorageType.DEFAULT));
+ new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block1,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
- StorageType.DEFAULT));
+ new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block2,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
- StorageType.DEFAULT));
+ new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
- StorageType.DEFAULT));
+ new StorageType[]{StorageType.DEFAULT}, null));
// We must be backwards compatible when adding storageType
TestWritable.testWritable(generateTokenId(sm, block3,
- EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
- (StorageType[]) null));
+ EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), null, null));
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
- StorageType.EMPTY_ARRAY));
+ StorageType.EMPTY_ARRAY, null));
}
@Test
@@ -215,35 +214,36 @@ public class TestBlockToken {
private static void checkAccess(BlockTokenSecretManager m,
Token<BlockTokenIdentifier> t, ExtendedBlock blk,
- BlockTokenIdentifier.AccessMode mode) throws SecretManager.InvalidToken {
- m.checkAccess(t, null, blk, mode, new StorageType[]{ StorageType.DEFAULT });
+ BlockTokenIdentifier.AccessMode mode, StorageType[] storageTypes,
+ String[] storageIds) throws SecretManager.InvalidToken {
+ m.checkAccess(t, null, blk, mode, storageTypes, storageIds);
}
private void tokenGenerationAndVerification(BlockTokenSecretManager master,
- BlockTokenSecretManager slave, StorageType... storageTypes)
- throws Exception {
+ BlockTokenSecretManager slave, StorageType[] storageTypes,
+ String[] storageIds) throws Exception {
// single-mode tokens
for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
.values()) {
// generated by master
Token<BlockTokenIdentifier> token1 = master.generateToken(block1,
- EnumSet.of(mode), storageTypes);
- checkAccess(master, token1, block1, mode);
- checkAccess(slave, token1, block1, mode);
+ EnumSet.of(mode), storageTypes, storageIds);
+ checkAccess(master, token1, block1, mode, storageTypes, storageIds);
+ checkAccess(slave, token1, block1, mode, storageTypes, storageIds);
// generated by slave
Token<BlockTokenIdentifier> token2 = slave.generateToken(block2,
- EnumSet.of(mode), storageTypes);
- checkAccess(master, token2, block2, mode);
- checkAccess(slave, token2, block2, mode);
+ EnumSet.of(mode), storageTypes, storageIds);
+ checkAccess(master, token2, block2, mode, storageTypes, storageIds);
+ checkAccess(slave, token2, block2, mode, storageTypes, storageIds);
}
// multi-mode tokens
Token<BlockTokenIdentifier> mtoken = master.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
- storageTypes);
+ storageTypes, storageIds);
for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
.values()) {
- checkAccess(master, mtoken, block3, mode);
- checkAccess(slave, mtoken, block3, mode);
+ checkAccess(master, mtoken, block3, mode, storageTypes, storageIds);
+ checkAccess(slave, mtoken, block3, mode, storageTypes, storageIds);
}
}
@@ -259,18 +259,18 @@ public class TestBlockToken {
ExportedBlockKeys keys = masterHandler.exportKeys();
slaveHandler.addKeys(keys);
tokenGenerationAndVerification(masterHandler, slaveHandler,
- StorageType.DEFAULT);
- tokenGenerationAndVerification(masterHandler, slaveHandler, null);
+ new StorageType[]{StorageType.DEFAULT}, null);
+ tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
// key updating
masterHandler.updateKeys();
tokenGenerationAndVerification(masterHandler, slaveHandler,
- StorageType.DEFAULT);
- tokenGenerationAndVerification(masterHandler, slaveHandler, null);
+ new StorageType[]{StorageType.DEFAULT}, null);
+ tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
keys = masterHandler.exportKeys();
slaveHandler.addKeys(keys);
tokenGenerationAndVerification(masterHandler, slaveHandler,
- StorageType.DEFAULT);
- tokenGenerationAndVerification(masterHandler, slaveHandler, null);
+ new StorageType[]{StorageType.DEFAULT}, null);
+ tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
}
@Test
@@ -315,7 +315,7 @@ public class TestBlockToken {
enableProtobuf);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
- new StorageType[]{StorageType.DEFAULT});
+ new StorageType[]{StorageType.DEFAULT}, new String[0]);
final Server server = createMockDatanode(sm, token, conf);
@@ -365,7 +365,7 @@ public class TestBlockToken {
enableProtobuf);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
- new StorageType[]{StorageType.DEFAULT});
+ new StorageType[]{StorageType.DEFAULT}, new String[0]);
final Server server = createMockDatanode(sm, token, conf);
server.start();
@@ -451,19 +451,23 @@ public class TestBlockToken {
ExportedBlockKeys keys = masterHandler.exportKeys();
bpMgr.addKeys(bpid, keys);
+ String[] storageIds = new String[] {"DS-9001"};
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
- StorageType.DEFAULT);
- tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
+ new StorageType[]{StorageType.DEFAULT}, storageIds);
+ tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
+ null);
// Test key updating
masterHandler.updateKeys();
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
- StorageType.DEFAULT);
- tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
+ new StorageType[]{StorageType.DEFAULT}, storageIds);
+ tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
+ null);
keys = masterHandler.exportKeys();
bpMgr.addKeys(bpid, keys);
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
- StorageType.DEFAULT);
- tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
+ new StorageType[]{StorageType.DEFAULT}, new String[]{"DS-9001"});
+ tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
+ null);
}
}
@@ -540,7 +544,7 @@ public class TestBlockToken {
useProto);
Token<BlockTokenIdentifier> token = sm.generateToken(block1,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
- new StorageType[]{StorageType.DEFAULT});
+ new StorageType[]{StorageType.DEFAULT}, new String[0]);
final byte[] tokenBytes = token.getIdentifier();
BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@@ -605,7 +609,7 @@ public class TestBlockToken {
useProto);
Token<BlockTokenIdentifier> token = sm.generateToken(block1,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
- StorageType.EMPTY_ARRAY);
+ StorageType.EMPTY_ARRAY, new String[0]);
final byte[] tokenBytes = token.getIdentifier();
BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@@ -699,7 +703,8 @@ public class TestBlockToken {
*/
BlockTokenIdentifier identifier = new BlockTokenIdentifier("user",
"blockpool", 123, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
- new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, true);
+ new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
+ new String[] {"fake-storage-id"}, true);
Calendar cal = new GregorianCalendar();
cal.set(2017, 1, 9, 0, 12, 35);
long datetime = cal.getTimeInMillis();
@@ -749,7 +754,8 @@ public class TestBlockToken {
new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
StorageType.DISK, StorageType.ARCHIVE};
BlockTokenIdentifier ident = new BlockTokenIdentifier("user", "bpool",
- 123, accessModes, storageTypes, useProto);
+ 123, accessModes, storageTypes, new String[] {"fake-storage-id"},
+ useProto);
ident.setExpiryDate(1487080345L);
BlockTokenIdentifier ret = writeAndReadBlockToken(ident);
assertEquals(ret.getExpiryDate(), 1487080345L);
@@ -760,6 +766,7 @@ public class TestBlockToken {
assertEquals(ret.getAccessModes(),
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
assertArrayEquals(ret.getStorageTypes(), storageTypes);
+ assertArrayEquals(ret.getStorageIds(), new String[] {"fake-storage-id"});
}
@Test
@@ -767,5 +774,4 @@ public class TestBlockToken {
testBlockTokenSerialization(false);
testBlockTokenSerialization(true);
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index 6810a0b..c9ff572 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -389,7 +389,7 @@ public abstract class BlockReportTestBase {
// Create a bogus new block which will not be present on the namenode.
ExtendedBlock b = new ExtendedBlock(
poolId, rand.nextLong(), 1024L, rand.nextLong());
- dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false);
+ dn.getFSDataset().createRbw(StorageType.DEFAULT, null, b, false);
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index cd3befd..18b4922 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1023,21 +1023,22 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized ReplicaHandler createRbw(
- StorageType storageType, ExtendedBlock b,
+ StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
- return createTemporary(storageType, b);
+ return createTemporary(storageType, storageId, b);
}
@Override // FsDatasetSpi
public synchronized ReplicaHandler createTemporary(
- StorageType storageType, ExtendedBlock b) throws IOException {
+ StorageType storageType, String storageId, ExtendedBlock b)
+ throws IOException {
if (isValidBlock(b)) {
- throw new ReplicaAlreadyExistsException("Block " + b +
- " is valid, and cannot be written to.");
- }
+ throw new ReplicaAlreadyExistsException("Block " + b +
+ " is valid, and cannot be written to.");
+ }
if (isValidRbw(b)) {
- throw new ReplicaAlreadyExistsException("Block " + b +
- " is being written, and cannot be written to.");
+ throw new ReplicaAlreadyExistsException("Block " + b +
+ " is being written, and cannot be written to.");
}
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
@@ -1419,7 +1420,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
- StorageType targetStorageType) throws IOException {
+ StorageType targetStorageType, String storageId) throws IOException {
// TODO Auto-generated method stub
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 579252b..311d5a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -647,7 +647,7 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
- dn.data.createRbw(StorageType.DEFAULT, block, false);
+ dn.data.createRbw(StorageType.DEFAULT, null, block, false);
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
recoveryWorker.new RecoveryTaskContiguous(rBlock);
try {
@@ -673,7 +673,7 @@ public class TestBlockRecovery {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
ReplicaInPipeline replicaInfo = dn.data.createRbw(
- StorageType.DEFAULT, block, false).getReplica();
+ StorageType.DEFAULT, null, block, false).getReplica();
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,
@@ -972,7 +972,7 @@ public class TestBlockRecovery {
// Register this thread as the writer for the recoveringBlock.
LOG.debug("slowWriter creating rbw");
ReplicaHandler replicaHandler =
- spyDN.data.createRbw(StorageType.DISK, block, false);
+ spyDN.data.createRbw(StorageType.DISK, null, block, false);
replicaHandler.close();
LOG.debug("slowWriter created rbw");
// Tell the parent thread to start progressing.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
index f811bd8..8992d47 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
@@ -394,7 +394,7 @@ public class TestBlockReplacement {
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
new Sender(out).replaceBlock(block, targetStorageType,
BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
- sourceProxy);
+ sourceProxy, null);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
index b2bfe49..8fda664 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
@@ -129,7 +129,7 @@ public class TestDataXceiverLazyPersistHint {
DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 0),
CachingStrategy.newDefaultStrategy(),
lazyPersist,
- false, null);
+ false, null, null, new String[0]);
}
// Helper functions to setup the mock objects.
@@ -151,7 +151,7 @@ public class TestDataXceiverLazyPersistHint {
any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(),
anyString(), any(DatanodeInfo.class), any(DataNode.class),
any(DataChecksum.class), any(CachingStrategy.class),
- captor.capture(), anyBoolean());
+ captor.capture(), anyBoolean(), any(String.class));
doReturn(mock(DataOutputStream.class)).when(xceiverSpy)
.getBufferedOutputStream();
return xceiverSpy;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index cd86720..38e4287 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -167,7 +167,8 @@ public class TestDiskError {
BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], new StorageType[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
- checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
+ checksum, CachingStrategy.newDefaultStrategy(), false, false,
+ null, null, new String[0]);
out.flush();
// close the connection before sending the content of the block
@@ -274,7 +275,7 @@ public class TestDiskError {
dn1.getDatanodeId());
dn0.transferBlock(block, new DatanodeInfo[]{dnd1},
- new StorageType[]{StorageType.DISK});
+ new StorageType[]{StorageType.DISK}, new String[0]);
// Sleep for 1 second so the DataTrasnfer daemon can start transfer.
try {
Thread.sleep(1000);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index 4e724bc7..2e69595 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -81,7 +81,7 @@ public class TestSimulatedFSDataset {
// we pass expected len as zero, - fsdataset should use the sizeof actual
// data written
ReplicaInPipeline bInfo = fsdataset.createRbw(
- StorageType.DEFAULT, b, false).getReplica();
+ StorageType.DEFAULT, null, b, false).getReplica();
ReplicaOutputStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
try {
@@ -368,7 +368,7 @@ public class TestSimulatedFSDataset {
ExtendedBlock block = new ExtendedBlock(newbpid,1);
try {
// it will throw an exception if the block pool is not found
- fsdataset.createTemporary(StorageType.DEFAULT, block);
+ fsdataset.createTemporary(StorageType.DEFAULT, null, block);
} catch (IOException ioe) {
// JUnit does not capture exception in non-main thread,
// so cache it and then let main thread throw later.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 62ef731..2e439d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -138,14 +138,15 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
- public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b)
+ public ReplicaHandler createTemporary(StorageType t, String i,
+ ExtendedBlock b)
throws IOException {
return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
}
@Override
- public ReplicaHandler createRbw(StorageType t, ExtendedBlock b, boolean tf)
- throws IOException {
+ public ReplicaHandler createRbw(StorageType storageType, String id,
+ ExtendedBlock b, boolean tf) throws IOException {
return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
}
@@ -332,7 +333,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
- public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, StorageType targetStorageType) throws IOException {
+ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
+ StorageType targetStorageType, String storageId) throws IOException {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
index 9414a0e..24a43e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
@@ -89,10 +89,12 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
// than the threshold of 1MB.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3);
-
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
}
@Test(timeout=60000)
@@ -115,21 +117,29 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
// Third volume, again with 3MB free space.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(2).getAvailable()).thenReturn(1024L * 1024L * 3);
-
+
// We should alternate assigning between the two volumes with a lot of free
// space.
initPolicy(policy, 1.0f);
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+ null));
// All writes should be assigned to the volume with the least free space.
initPolicy(policy, 0.0f);
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+ null));
}
@Test(timeout=60000)
@@ -156,22 +166,30 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
// Fourth volume, again with 3MB free space.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(3).getAvailable()).thenReturn(1024L * 1024L * 3);
-
+
// We should alternate assigning between the two volumes with a lot of free
// space.
initPolicy(policy, 1.0f);
- Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100));
+ Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100,
+ null));
// We should alternate assigning between the two volumes with less free
// space.
initPolicy(policy, 0.0f);
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
+ null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
+ null));
}
@Test(timeout=60000)
@@ -190,13 +208,14 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
// than the threshold of 1MB.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3);
-
+
// All writes should be assigned to the volume with the least free space.
// However, if the volume with the least free space doesn't have enough
// space to accept the replica size, and another volume does have enough
// free space, that should be chosen instead.
initPolicy(policy, 0.0f);
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 1024L * 1024L * 2));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes,
+ 1024L * 1024L * 2, null));
}
@Test(timeout=60000)
@@ -220,10 +239,11 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
.thenReturn(1024L * 1024L * 3)
.thenReturn(1024L * 1024L * 3)
.thenReturn(1024L * 1024L * 1); // After the third check, return 1MB.
-
+
// Should still be able to get a volume for the replica even though the
// available space on the second volume changed.
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes,
+ 100, null));
}
@Test(timeout=60000)
@@ -271,12 +291,12 @@ public class TestAvailableSpaceVolumeChoosingPolicy {
Mockito.when(volume.getAvailable()).thenReturn(1024L * 1024L * 3);
volumes.add(volume);
}
-
+
initPolicy(policy, preferencePercent);
long lowAvailableSpaceVolumeSelected = 0;
long highAvailableSpaceVolumeSelected = 0;
for (int i = 0; i < RANDOMIZED_ITERATIONS; i++) {
- FsVolumeSpi volume = policy.chooseVolume(volumes, 100);
+ FsVolumeSpi volume = policy.chooseVolume(volumes, 100, null);
for (int j = 0; j < volumes.size(); j++) {
// Note how many times the first low available volume was selected
if (volume == volumes.get(j) && j == 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
index 9b3047f..44e2a30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
@@ -50,20 +50,21 @@ public class TestRoundRobinVolumeChoosingPolicy {
// Second volume, with 200 bytes of space.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
-
+
// Test two rounds of round-robin choosing
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
- Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null));
+ Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null));
// The first volume has only 100L space, so the policy should
// wisely choose the second one in case we ask for more.
- Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150));
+ Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150,
+ null));
// Fail if no volume can be chosen?
try {
- policy.chooseVolume(volumes, Long.MAX_VALUE);
+ policy.chooseVolume(volumes, Long.MAX_VALUE, null);
Assert.fail();
} catch (IOException e) {
// Passed.
@@ -93,7 +94,7 @@ public class TestRoundRobinVolumeChoosingPolicy {
int blockSize = 700;
try {
- policy.chooseVolume(volumes, blockSize);
+ policy.chooseVolume(volumes, blockSize, null);
Assert.fail("expected to throw DiskOutOfSpaceException");
} catch(DiskOutOfSpaceException e) {
Assert.assertEquals("Not returnig the expected message",
@@ -137,21 +138,21 @@ public class TestRoundRobinVolumeChoosingPolicy {
Mockito.when(ssdVolumes.get(1).getAvailable()).thenReturn(100L);
Assert.assertEquals(diskVolumes.get(0),
- policy.chooseVolume(diskVolumes, 0));
+ policy.chooseVolume(diskVolumes, 0, null));
// Independent Round-Robin for different storage type
Assert.assertEquals(ssdVolumes.get(0),
- policy.chooseVolume(ssdVolumes, 0));
+ policy.chooseVolume(ssdVolumes, 0, null));
// Take block size into consideration
Assert.assertEquals(ssdVolumes.get(0),
- policy.chooseVolume(ssdVolumes, 150L));
+ policy.chooseVolume(ssdVolumes, 150L, null));
Assert.assertEquals(diskVolumes.get(1),
- policy.chooseVolume(diskVolumes, 0));
+ policy.chooseVolume(diskVolumes, 0, null));
Assert.assertEquals(diskVolumes.get(0),
- policy.chooseVolume(diskVolumes, 50L));
+ policy.chooseVolume(diskVolumes, 50L, null));
try {
- policy.chooseVolume(diskVolumes, 200L);
+ policy.chooseVolume(diskVolumes, 200L, null);
Assert.fail("Should throw an DiskOutOfSpaceException before this!");
} catch (DiskOutOfSpaceException e) {
// Pass.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 905c3f0..3293561 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -259,7 +259,7 @@ public class TestFsDatasetImpl {
String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
ExtendedBlock eb = new ExtendedBlock(bpid, i);
try (ReplicaHandler replica =
- dataset.createRbw(StorageType.DEFAULT, eb, false)) {
+ dataset.createRbw(StorageType.DEFAULT, null, eb, false)) {
}
}
final String[] dataDirs =
@@ -566,7 +566,7 @@ public class TestFsDatasetImpl {
class ResponderThread extends Thread {
public void run() {
try (ReplicaHandler replica = dataset
- .createRbw(StorageType.DEFAULT, eb, false)) {
+ .createRbw(StorageType.DEFAULT, null, eb, false)) {
LOG.info("CreateRbw finished");
startFinalizeLatch.countDown();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index 83c15ca..ee3a79f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -101,7 +101,7 @@ public class TestFsVolumeList {
}
for (int i = 0; i < 10; i++) {
try (FsVolumeReference ref =
- volumeList.getNextVolume(StorageType.DEFAULT, 128)) {
+ volumeList.getNextVolume(StorageType.DEFAULT, null, 128)) {
// volume No.2 will not be chosen.
assertNotEquals(ref.getVolume(), volumes.get(1));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index da53cae..11525ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -353,7 +353,7 @@ public class TestWriteToReplica {
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false);
+ dataSet.createRbw(StorageType.DEFAULT, null, blocks[FINALIZED], false);
Assert.fail("Should not have created a replica that's already " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
@@ -371,7 +371,7 @@ public class TestWriteToReplica {
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false);
+ dataSet.createRbw(StorageType.DEFAULT, null, blocks[TEMPORARY], false);
Assert.fail("Should not have created a replica that had created as " +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
@@ -381,7 +381,7 @@ public class TestWriteToReplica {
0L, blocks[RBW].getNumBytes()); // expect to be successful
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false);
+ dataSet.createRbw(StorageType.DEFAULT, null, blocks[RBW], false);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
@@ -397,7 +397,7 @@ public class TestWriteToReplica {
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false);
+ dataSet.createRbw(StorageType.DEFAULT, null, blocks[RWR], false);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
@@ -413,7 +413,7 @@ public class TestWriteToReplica {
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false);
+ dataSet.createRbw(StorageType.DEFAULT, null, blocks[RUR], false);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
@@ -430,49 +430,49 @@ public class TestWriteToReplica {
e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
}
- dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
+ dataSet.createRbw(StorageType.DEFAULT, null, blocks[NON_EXISTENT], false);
}
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED]);
Assert.fail("Should not have created a temporary replica that was " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY]);
Assert.fail("Should not have created a replica that had created as" +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW]);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR]);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR]);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
}
- dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
Assert.fail("Should not have created a replica that had already been "
+ "created " + blocks[NON_EXISTENT]);
} catch (Exception e) {
@@ -485,7 +485,8 @@ public class TestWriteToReplica {
blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
try {
ReplicaInPipeline replicaInfo =
- dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica();
+ dataSet.createTemporary(StorageType.DEFAULT, null,
+ blocks[NON_EXISTENT]).getReplica();
Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
Assert.assertTrue(
replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java
new file mode 100644
index 0000000..e0f7426
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.server.blockmanagement.*;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.apache.hadoop.net.Node;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test to ensure that the StorageType and StorageID sent from Namenode
+ * to DFSClient are respected.
+ */
+public class TestNamenodeStorageDirectives {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestNamenodeStorageDirectives.class);
+
+ private static final int BLOCK_SIZE = 512;
+
+ private MiniDFSCluster cluster;
+
+ @After
+ public void tearDown() {
+ shutdown();
+ }
+
+ private void startDFSCluster(int numNameNodes, int numDataNodes,
+ int storagePerDataNode, StorageType[][] storageTypes)
+ throws IOException {
+ startDFSCluster(numNameNodes, numDataNodes, storagePerDataNode,
+ storageTypes, RoundRobinVolumeChoosingPolicy.class,
+ BlockPlacementPolicyDefault.class);
+ }
+
+ private void startDFSCluster(int numNameNodes, int numDataNodes,
+ int storagePerDataNode, StorageType[][] storageTypes,
+ Class<? extends VolumeChoosingPolicy> volumeChoosingPolicy,
+ Class<? extends BlockPlacementPolicy> blockPlacementPolicy) throws
+ IOException {
+ shutdown();
+ Configuration conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+
+ /*
+ * Lower the DN heartbeat, DF rate, and recheck interval to one second
+ * so state about failures and datanode death propagates faster.
+ */
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ 1000);
+ /* Allow 1 volume failure */
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+ conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+ 0, TimeUnit.MILLISECONDS);
+ conf.setClass(
+ DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
+ volumeChoosingPolicy, VolumeChoosingPolicy.class);
+ conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+ blockPlacementPolicy, BlockPlacementPolicy.class);
+
+ MiniDFSNNTopology nnTopology =
+ MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
+
+ cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(nnTopology)
+ .numDataNodes(numDataNodes)
+ .storagesPerDatanode(storagePerDataNode)
+ .storageTypes(storageTypes)
+ .build();
+ cluster.waitActive();
+ }
+
+ private void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ private void createFile(Path path, int numBlocks, short replicateFactor)
+ throws IOException, InterruptedException, TimeoutException {
+ createFile(0, path, numBlocks, replicateFactor);
+ }
+
+ private void createFile(int fsIdx, Path path, int numBlocks,
+ short replicateFactor)
+ throws IOException, TimeoutException, InterruptedException {
+ final int seed = 0;
+ final DistributedFileSystem fs = cluster.getFileSystem(fsIdx);
+ DFSTestUtil.createFile(fs, path, BLOCK_SIZE * numBlocks,
+ replicateFactor, seed);
+ DFSTestUtil.waitReplication(fs, path, replicateFactor);
+ }
+
+ private boolean verifyFileReplicasOnStorageType(Path path, int numBlocks,
+ StorageType storageType) throws IOException {
+ MiniDFSCluster.NameNodeInfo info = cluster.getNameNodeInfos()[0];
+ InetSocketAddress addr = info.nameNode.getServiceRpcAddress();
+ assert addr.getPort() != 0;
+ DFSClient client = new DFSClient(addr, cluster.getConfiguration(0));
+
+ FileSystem fs = cluster.getFileSystem();
+
+ if (!fs.exists(path)) {
+ LOG.info("verifyFileReplicasOnStorageType: file {} does not exist", path);
+ return false;
+ }
+ long fileLength = client.getFileInfo(path.toString()).getLen();
+ int foundBlocks = 0;
+ LocatedBlocks locatedBlocks =
+ client.getLocatedBlocks(path.toString(), 0, fileLength);
+ for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+ for (StorageType st : locatedBlock.getStorageTypes()) {
+ if (st == storageType) {
+ foundBlocks++;
+ }
+ }
+ }
+
+ LOG.info("Found {}/{} blocks on StorageType {}",
+ foundBlocks, numBlocks, storageType);
+ final boolean isValid = foundBlocks >= numBlocks;
+ return isValid;
+ }
+
+ private void testStorageTypes(StorageType[][] storageTypes,
+ String storagePolicy, StorageType[] expectedStorageTypes,
+ StorageType[] unexpectedStorageTypes) throws ReconfigurationException,
+ InterruptedException, TimeoutException, IOException {
+ final int numDataNodes = storageTypes.length;
+ final int storagePerDataNode = storageTypes[0].length;
+ startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes);
+ cluster.getFileSystem(0).setStoragePolicy(new Path("/"), storagePolicy);
+ Path testFile = new Path("/test");
+ final short replFactor = 2;
+ final int numBlocks = 10;
+ createFile(testFile, numBlocks, replFactor);
+
+ for (StorageType storageType: expectedStorageTypes) {
+ assertTrue(verifyFileReplicasOnStorageType(testFile, numBlocks,
+ storageType));
+ }
+
+ for (StorageType storageType: unexpectedStorageTypes) {
+ assertFalse(verifyFileReplicasOnStorageType(testFile, numBlocks,
+ storageType));
+ }
+ }
+
+ /**
+ * Verify that writing to SSD and DISK will write to the correct Storage
+ * Types.
+ * @throws IOException
+ */
+ @Test(timeout=60000)
+ public void testTargetStorageTypes() throws ReconfigurationException,
+ InterruptedException, TimeoutException, IOException {
+ // DISK and not anything else.
+ testStorageTypes(new StorageType[][]{
+ {StorageType.SSD, StorageType.DISK},
+ {StorageType.SSD, StorageType.DISK}},
+ "ONE_SSD",
+ new StorageType[]{StorageType.SSD, StorageType.DISK},
+ new StorageType[]{StorageType.RAM_DISK, StorageType.ARCHIVE});
+ // only on SSD.
+ testStorageTypes(new StorageType[][]{
+ {StorageType.SSD, StorageType.DISK},
+ {StorageType.SSD, StorageType.DISK}},
+ "ALL_SSD",
+ new StorageType[]{StorageType.SSD},
+ new StorageType[]{StorageType.RAM_DISK, StorageType.DISK,
+ StorageType.ARCHIVE});
+ // only on SSD.
+ testStorageTypes(new StorageType[][]{
+ {StorageType.SSD, StorageType.DISK, StorageType.DISK},
+ {StorageType.SSD, StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK, StorageType.DISK}},
+ "ALL_SSD",
+ new StorageType[]{StorageType.SSD},
+ new StorageType[]{StorageType.RAM_DISK, StorageType.DISK,
+ StorageType.ARCHIVE});
+
+ // DISK and not anything else.
+ testStorageTypes(new StorageType[][] {
+ {StorageType.RAM_DISK, StorageType.SSD},
+ {StorageType.SSD, StorageType.DISK},
+ {StorageType.SSD, StorageType.DISK}},
+ "HOT",
+ new StorageType[]{StorageType.DISK},
+ new StorageType[] {StorageType.RAM_DISK, StorageType.SSD,
+ StorageType.ARCHIVE});
+
+ testStorageTypes(new StorageType[][] {
+ {StorageType.RAM_DISK, StorageType.SSD},
+ {StorageType.SSD, StorageType.DISK},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE}},
+ "WARM",
+ new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
+ new StorageType[]{StorageType.RAM_DISK, StorageType.SSD});
+
+ testStorageTypes(new StorageType[][] {
+ {StorageType.RAM_DISK, StorageType.SSD},
+ {StorageType.SSD, StorageType.DISK},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE}},
+ "COLD",
+ new StorageType[]{StorageType.ARCHIVE},
+ new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+ StorageType.DISK});
+
+ // We wait for Lasy Persist to write to disk.
+ testStorageTypes(new StorageType[][] {
+ {StorageType.RAM_DISK, StorageType.SSD},
+ {StorageType.SSD, StorageType.DISK},
+ {StorageType.SSD, StorageType.DISK}},
+ "LAZY_PERSIST",
+ new StorageType[]{StorageType.DISK},
+ new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+ StorageType.ARCHIVE});
+ }
+
+ /**
+ * A VolumeChoosingPolicy test stub used to verify that the storageId passed
+ * in is indeed in the list of volumes.
+ * @param <V>
+ */
+ private static class TestVolumeChoosingPolicy<V extends FsVolumeSpi>
+ extends RoundRobinVolumeChoosingPolicy<V> {
+ static String expectedStorageId;
+
+ @Override
+ public V chooseVolume(List<V> volumes, long replicaSize, String storageId)
+ throws IOException {
+ assertEquals(expectedStorageId, storageId);
+ return super.chooseVolume(volumes, replicaSize, storageId);
+ }
+ }
+
+ private static class TestBlockPlacementPolicy
+ extends BlockPlacementPolicyDefault {
+ static DatanodeStorageInfo[] dnStorageInfosToReturn;
+
+ @Override
+ public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas,
+ Node writer, List<DatanodeStorageInfo> chosenNodes,
+ boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize,
+ final BlockStoragePolicy storagePolicy, EnumSet<AddBlockFlag> flags) {
+ return dnStorageInfosToReturn;
+ }
+ }
+
+ private DatanodeStorageInfo getDatanodeStorageInfo(int dnIndex)
+ throws UnregisteredNodeException {
+ if (cluster == null) {
+ return null;
+ }
+ DatanodeID dnId = cluster.getDataNodes().get(dnIndex).getDatanodeId();
+ DatanodeManager dnManager = cluster.getNamesystem()
+ .getBlockManager().getDatanodeManager();
+ return dnManager.getDatanode(dnId).getStorageInfos()[0];
+ }
+
+ @Test(timeout=60000)
+ public void testStorageIDBlockPlacementSpecific()
+ throws ReconfigurationException, InterruptedException, TimeoutException,
+ IOException {
+ final StorageType[][] storageTypes = {
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ };
+ final int numDataNodes = storageTypes.length;
+ final int storagePerDataNode = storageTypes[0].length;
+ startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes,
+ TestVolumeChoosingPolicy.class, TestBlockPlacementPolicy.class);
+ Path testFile = new Path("/test");
+ final short replFactor = 1;
+ final int numBlocks = 10;
+ DatanodeStorageInfo dnInfoToUse = getDatanodeStorageInfo(0);
+ TestBlockPlacementPolicy.dnStorageInfosToReturn =
+ new DatanodeStorageInfo[] {dnInfoToUse};
+ TestVolumeChoosingPolicy.expectedStorageId = dnInfoToUse.getStorageID();
+ //file creation invokes both BlockPlacementPolicy and VolumeChoosingPolicy,
+ //and will test that the storage ids match
+ createFile(testFile, numBlocks, replFactor);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: HDFS-9807. Add an optional StorageID to
writes. Contributed by Ewan Higgs
Posted by cd...@apache.org.
HDFS-9807. Add an optional StorageID to writes. Contributed by Ewan Higgs
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a3954cca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a3954cca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a3954cca
Branch: refs/heads/trunk
Commit: a3954ccab148bddc290cb96528e63ff19799bcc9
Parents: 4e6bbd0
Author: Chris Douglas <cd...@apache.org>
Authored: Fri May 5 12:01:26 2017 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri May 5 12:01:26 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DataStreamer.java | 35 +-
.../apache/hadoop/hdfs/StripedDataStreamer.java | 10 +-
.../datatransfer/DataTransferProtocol.java | 19 +-
.../hdfs/protocol/datatransfer/Sender.java | 29 +-
.../hadoop/hdfs/protocolPB/PBHelperClient.java | 13 +
.../token/block/BlockTokenIdentifier.java | 36 +-
.../src/main/proto/datatransfer.proto | 4 +
.../src/main/proto/hdfs.proto | 1 +
.../hdfs/protocol/datatransfer/Receiver.java | 20 +-
.../block/BlockPoolTokenSecretManager.java | 22 +-
.../token/block/BlockTokenSecretManager.java | 55 ++--
.../hadoop/hdfs/server/balancer/Dispatcher.java | 5 +-
.../hadoop/hdfs/server/balancer/KeyManager.java | 4 +-
.../server/blockmanagement/BlockManager.java | 6 +-
.../hdfs/server/datanode/BPOfferService.java | 3 +-
.../hdfs/server/datanode/BlockReceiver.java | 12 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 42 ++-
.../hdfs/server/datanode/DataXceiver.java | 68 ++--
.../erasurecode/ErasureCodingWorker.java | 3 +-
.../erasurecode/StripedBlockReader.java | 2 +-
.../erasurecode/StripedBlockWriter.java | 10 +-
.../erasurecode/StripedReconstructionInfo.java | 16 +-
.../datanode/erasurecode/StripedWriter.java | 5 +-
.../AvailableSpaceVolumeChoosingPolicy.java | 20 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 6 +-
.../RoundRobinVolumeChoosingPolicy.java | 2 +-
.../fsdataset/VolumeChoosingPolicy.java | 5 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 21 +-
.../datanode/fsdataset/impl/FsVolumeList.java | 19 +-
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 3 +-
.../hadoop/hdfs/TestBlockStoragePolicy.java | 75 ++++-
.../hadoop/hdfs/TestDataTransferProtocol.java | 3 +-
.../hdfs/TestWriteBlockGetsBlockLengthHint.java | 6 +-
.../security/token/block/TestBlockToken.java | 98 +++---
.../server/datanode/BlockReportTestBase.java | 2 +-
.../server/datanode/SimulatedFSDataset.java | 19 +-
.../hdfs/server/datanode/TestBlockRecovery.java | 6 +-
.../server/datanode/TestBlockReplacement.java | 2 +-
.../TestDataXceiverLazyPersistHint.java | 4 +-
.../hdfs/server/datanode/TestDiskError.java | 5 +-
.../server/datanode/TestSimulatedFSDataset.java | 4 +-
.../extdataset/ExternalDatasetImpl.java | 10 +-
.../TestAvailableSpaceVolumeChoosingPolicy.java | 76 +++--
.../TestRoundRobinVolumeChoosingPolicy.java | 29 +-
.../fsdataset/impl/TestFsDatasetImpl.java | 4 +-
.../fsdataset/impl/TestFsVolumeList.java | 2 +-
.../fsdataset/impl/TestWriteToReplica.java | 29 +-
.../namenode/TestNamenodeStorageDirectives.java | 330 +++++++++++++++++++
48 files changed, 903 insertions(+), 297 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 0268537..49c17b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -174,10 +174,12 @@ class DataStreamer extends Daemon {
void sendTransferBlock(final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
+ final String[] targetStorageIDs,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
//send the TRANSFER_BLOCK request
new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
- dfsClient.clientName, targets, targetStorageTypes);
+ dfsClient.clientName, targets, targetStorageTypes,
+ targetStorageIDs);
out.flush();
//ack
BlockOpResponseProto transferResponse = BlockOpResponseProto
@@ -1367,9 +1369,11 @@ class DataStreamer extends Daemon {
final DatanodeInfo src = original[tried % original.length];
final DatanodeInfo[] targets = {nodes[d]};
final StorageType[] targetStorageTypes = {storageTypes[d]};
+ final String[] targetStorageIDs = {storageIDs[d]};
try {
- transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+ transfer(src, targets, targetStorageTypes, targetStorageIDs,
+ lb.getBlockToken());
} catch (IOException ioe) {
DFSClient.LOG.warn("Error transferring data from " + src + " to " +
nodes[d] + ": " + ioe.getMessage());
@@ -1400,6 +1404,7 @@ class DataStreamer extends Daemon {
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
+ final String[] targetStorageIDs,
final Token<BlockTokenIdentifier> blockToken)
throws IOException {
//transfer replica to the new datanode
@@ -1412,7 +1417,8 @@ class DataStreamer extends Daemon {
streams = new StreamerStreams(src, writeTimeout, readTimeout,
blockToken);
- streams.sendTransferBlock(targets, targetStorageTypes, blockToken);
+ streams.sendTransferBlock(targets, targetStorageTypes,
+ targetStorageIDs, blockToken);
return;
} catch (InvalidEncryptionKeyException e) {
policy.recordFailure(e);
@@ -1440,11 +1446,12 @@ class DataStreamer extends Daemon {
streamerClosed = true;
return;
}
- setupPipelineInternal(nodes, storageTypes);
+ setupPipelineInternal(nodes, storageTypes, storageIDs);
}
protected void setupPipelineInternal(DatanodeInfo[] datanodes,
- StorageType[] nodeStorageTypes) throws IOException {
+ StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
+ throws IOException {
boolean success = false;
long newGS = 0L;
while (!success && !streamerClosed && dfsClient.clientRunning) {
@@ -1465,7 +1472,8 @@ class DataStreamer extends Daemon {
accessToken = lb.getBlockToken();
// set up the pipeline again with the remaining nodes
- success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
+ success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS,
+ isRecovery);
failPacket4Testing();
@@ -1601,7 +1609,8 @@ class DataStreamer extends Daemon {
protected LocatedBlock nextBlockOutputStream() throws IOException {
LocatedBlock lb;
DatanodeInfo[] nodes;
- StorageType[] storageTypes;
+ StorageType[] nextStorageTypes;
+ String[] nextStorageIDs;
int count = dfsClient.getConf().getNumBlockWriteRetry();
boolean success;
final ExtendedBlock oldBlock = block.getCurrentBlock();
@@ -1617,10 +1626,12 @@ class DataStreamer extends Daemon {
bytesSent = 0;
accessToken = lb.getBlockToken();
nodes = lb.getLocations();
- storageTypes = lb.getStorageTypes();
+ nextStorageTypes = lb.getStorageTypes();
+ nextStorageIDs = lb.getStorageIDs();
// Connect to first DataNode in the list.
- success = createBlockOutputStream(nodes, storageTypes, 0L, false);
+ success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
+ 0L, false);
if (!success) {
LOG.warn("Abandoning " + block);
@@ -1643,7 +1654,8 @@ class DataStreamer extends Daemon {
// Returns true if success, otherwise return failure.
//
boolean createBlockOutputStream(DatanodeInfo[] nodes,
- StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
+ StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
+ long newGS, boolean recoveryFlag) {
if (nodes.length == 0) {
LOG.info("nodes are empty for write pipeline of " + block);
return false;
@@ -1696,7 +1708,8 @@ class DataStreamer extends Daemon {
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS,
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
- (targetPinnings != null && targetPinnings[0]), targetPinnings);
+ (targetPinnings != null && targetPinnings[0]), targetPinnings,
+ nodeStorageIDs[0], nodeStorageIDs);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index b457edb..d920f18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -100,9 +100,11 @@ public class StripedDataStreamer extends DataStreamer {
DatanodeInfo[] nodes = lb.getLocations();
StorageType[] storageTypes = lb.getStorageTypes();
+ String[] storageIDs = lb.getStorageIDs();
// Connect to the DataNode. If fail the internal error state will be set.
- success = createBlockOutputStream(nodes, storageTypes, 0L, false);
+ success = createBlockOutputStream(nodes, storageTypes, storageIDs, 0L,
+ false);
if (!success) {
block.setCurrentBlock(null);
@@ -121,7 +123,8 @@ public class StripedDataStreamer extends DataStreamer {
@Override
protected void setupPipelineInternal(DatanodeInfo[] nodes,
- StorageType[] nodeStorageTypes) throws IOException {
+ StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
+ throws IOException {
boolean success = false;
while (!success && !streamerClosed() && dfsClient.clientRunning) {
if (!handleRestartingDatanode()) {
@@ -141,7 +144,8 @@ public class StripedDataStreamer extends DataStreamer {
// set up the pipeline again with the remaining nodes. when a striped
// data streamer comes here, it must be in external error state.
assert getErrorState().hasExternalError();
- success = createBlockOutputStream(nodes, nodeStorageTypes, newGS, true);
+ success = createBlockOutputStream(nodes, nodeStorageTypes,
+ nodeStorageIDs, newGS, true);
failPacket4Testing();
getErrorState().checkRestartingNodeDeadline(nodes);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index 6c5883c..fe20c37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -101,6 +101,11 @@ public interface DataTransferProtocol {
* written to disk lazily
* @param pinning whether to pin the block, so Balancer won't move it.
* @param targetPinnings whether to pin the block on target datanode
+ * @param storageID optional StorageIDs designating where to write the
+ * block. An empty String or null indicates that this
+ * has not been provided.
+ * @param targetStorageIDs target StorageIDs corresponding to the target
+ * datanodes.
*/
void writeBlock(final ExtendedBlock blk,
final StorageType storageType,
@@ -118,7 +123,9 @@ public interface DataTransferProtocol {
final CachingStrategy cachingStrategy,
final boolean allowLazyPersist,
final boolean pinning,
- final boolean[] targetPinnings) throws IOException;
+ final boolean[] targetPinnings,
+ final String storageID,
+ final String[] targetStorageIDs) throws IOException;
/**
* Transfer a block to another datanode.
* The block stage must be
@@ -129,12 +136,15 @@ public interface DataTransferProtocol {
* @param blockToken security token for accessing the block.
* @param clientName client's name.
* @param targets target datanodes.
+ * @param targetStorageIDs StorageID designating where to write the
+ * block.
*/
void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
- final StorageType[] targetStorageTypes) throws IOException;
+ final StorageType[] targetStorageTypes,
+ final String[] targetStorageIDs) throws IOException;
/**
* Request short circuit access file descriptors from a DataNode.
@@ -179,12 +189,15 @@ public interface DataTransferProtocol {
* @param blockToken security token for accessing the block.
* @param delHint the hint for deleting the block in the original datanode.
* @param source the source datanode for receiving the block.
+ * @param storageId an optional storage ID to designate where the block is
+ * replaced to.
*/
void replaceBlock(final ExtendedBlock blk,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
- final DatanodeInfo source) throws IOException;
+ final DatanodeInfo source,
+ final String storageId) throws IOException;
/**
* Copy a block.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index e133975..8a8d20d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -132,7 +133,9 @@ public class Sender implements DataTransferProtocol {
final CachingStrategy cachingStrategy,
final boolean allowLazyPersist,
final boolean pinning,
- final boolean[] targetPinnings) throws IOException {
+ final boolean[] targetPinnings,
+ final String storageId,
+ final String[] targetStorageIds) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken);
@@ -154,11 +157,14 @@ public class Sender implements DataTransferProtocol {
.setCachingStrategy(getCachingStrategy(cachingStrategy))
.setAllowLazyPersist(allowLazyPersist)
.setPinning(pinning)
- .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1));
-
+ .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1))
+ .addAllTargetStorageIds(PBHelperClient.convert(targetStorageIds, 1));
if (source != null) {
proto.setSource(PBHelperClient.convertDatanodeInfo(source));
}
+ if (storageId != null) {
+ proto.setStorageId(storageId);
+ }
send(out, Op.WRITE_BLOCK, proto.build());
}
@@ -168,7 +174,8 @@ public class Sender implements DataTransferProtocol {
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
- final StorageType[] targetStorageTypes) throws IOException {
+ final StorageType[] targetStorageTypes,
+ final String[] targetStorageIds) throws IOException {
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(
@@ -176,6 +183,7 @@ public class Sender implements DataTransferProtocol {
.addAllTargets(PBHelperClient.convert(targets))
.addAllTargetStorageTypes(
PBHelperClient.convertStorageTypes(targetStorageTypes))
+ .addAllTargetStorageIds(Arrays.asList(targetStorageIds))
.build();
send(out, Op.TRANSFER_BLOCK, proto);
@@ -233,15 +241,18 @@ public class Sender implements DataTransferProtocol {
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
- final DatanodeInfo source) throws IOException {
- OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
+ final DatanodeInfo source,
+ final String storageId) throws IOException {
+ OpReplaceBlockProto.Builder proto = OpReplaceBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.setStorageType(PBHelperClient.convertStorageType(storageType))
.setDelHint(delHint)
- .setSource(PBHelperClient.convertDatanodeInfo(source))
- .build();
+ .setSource(PBHelperClient.convertDatanodeInfo(source));
+ if (storageId != null) {
+ proto.setStorageId(storageId);
+ }
- send(out, Op.REPLACE_BLOCK, proto);
+ send(out, Op.REPLACE_BLOCK, proto.build());
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 2b8f102..614f653 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -345,6 +345,16 @@ public class PBHelperClient {
return pinnings;
}
+ public static List<String> convert(String[] targetIds, int idx) {
+ List<String> ids = new ArrayList<>();
+ if (targetIds != null) {
+ for (; idx < targetIds.length; ++idx) {
+ ids.add(targetIds[idx]);
+ }
+ }
+ return ids;
+ }
+
public static ExtendedBlock convert(ExtendedBlockProto eb) {
if (eb == null) return null;
return new ExtendedBlock( eb.getPoolId(), eb.getBlockId(), eb.getNumBytes(),
@@ -640,6 +650,9 @@ public class PBHelperClient {
for (StorageType storageType : blockTokenSecret.getStorageTypes()) {
builder.addStorageTypes(convertStorageType(storageType));
}
+ for (String storageId : blockTokenSecret.getStorageIds()) {
+ builder.addStorageIds(storageId);
+ }
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
index 228a7b6..5950752 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
@@ -53,16 +53,19 @@ public class BlockTokenIdentifier extends TokenIdentifier {
private long blockId;
private final EnumSet<AccessMode> modes;
private StorageType[] storageTypes;
+ private String[] storageIds;
private boolean useProto;
private byte [] cache;
public BlockTokenIdentifier() {
- this(null, null, 0, EnumSet.noneOf(AccessMode.class), null, false);
+ this(null, null, 0, EnumSet.noneOf(AccessMode.class), null, null,
+ false);
}
public BlockTokenIdentifier(String userId, String bpid, long blockId,
- EnumSet<AccessMode> modes, StorageType[] storageTypes, boolean useProto) {
+ EnumSet<AccessMode> modes, StorageType[] storageTypes,
+ String[] storageIds, boolean useProto) {
this.cache = null;
this.userId = userId;
this.blockPoolId = bpid;
@@ -70,6 +73,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
this.storageTypes = Optional.ofNullable(storageTypes)
.orElse(StorageType.EMPTY_ARRAY);
+ this.storageIds = Optional.ofNullable(storageIds)
+ .orElse(new String[0]);
this.useProto = useProto;
}
@@ -125,6 +130,10 @@ public class BlockTokenIdentifier extends TokenIdentifier {
return storageTypes;
}
+ public String[] getStorageIds(){
+ return storageIds;
+ }
+
@Override
public String toString() {
return "block_token_identifier (expiryDate=" + this.getExpiryDate()
@@ -132,7 +141,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
+ ", blockPoolId=" + this.getBlockPoolId()
+ ", blockId=" + this.getBlockId() + ", access modes="
+ this.getAccessModes() + ", storageTypes= "
- + Arrays.toString(this.getStorageTypes()) + ")";
+ + Arrays.toString(this.getStorageTypes()) + ", storageIds= "
+ + Arrays.toString(this.getStorageIds()) + ")";
}
static boolean isEqual(Object a, Object b) {
@@ -151,7 +161,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
&& isEqual(this.blockPoolId, that.blockPoolId)
&& this.blockId == that.blockId
&& isEqual(this.modes, that.modes)
- && Arrays.equals(this.storageTypes, that.storageTypes);
+ && Arrays.equals(this.storageTypes, that.storageTypes)
+ && Arrays.equals(this.storageIds, that.storageIds);
}
return false;
}
@@ -161,7 +172,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
^ (userId == null ? 0 : userId.hashCode())
^ (blockPoolId == null ? 0 : blockPoolId.hashCode())
- ^ (storageTypes == null ? 0 : Arrays.hashCode(storageTypes));
+ ^ (storageTypes == null ? 0 : Arrays.hashCode(storageTypes))
+ ^ (storageIds == null ? 0 : Arrays.hashCode(storageIds));
}
/**
@@ -220,6 +232,14 @@ public class BlockTokenIdentifier extends TokenIdentifier {
readStorageTypes[i] = WritableUtils.readEnum(in, StorageType.class);
}
storageTypes = readStorageTypes;
+
+ length = WritableUtils.readVInt(in);
+ String[] readStorageIds = new String[length];
+ for (int i = 0; i < length; i++) {
+ readStorageIds[i] = WritableUtils.readString(in);
+ }
+ storageIds = readStorageIds;
+
useProto = false;
}
@@ -248,6 +268,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
storageTypes = blockTokenSecretProto.getStorageTypesList().stream()
.map(PBHelperClient::convertStorageType)
.toArray(StorageType[]::new);
+ storageIds = blockTokenSecretProto.getStorageIdsList().stream()
+ .toArray(String[]::new);
useProto = true;
}
@@ -275,6 +297,10 @@ public class BlockTokenIdentifier extends TokenIdentifier {
for (StorageType type: storageTypes){
WritableUtils.writeEnum(out, type);
}
+ WritableUtils.writeVInt(out, storageIds.length);
+ for (String id: storageIds) {
+ WritableUtils.writeString(out, id);
+ }
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 889361a..2356201 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -125,12 +125,15 @@ message OpWriteBlockProto {
//whether to pin the block, so Balancer won't move it.
optional bool pinning = 14 [default = false];
repeated bool targetPinnings = 15;
+ optional string storageId = 16;
+ repeated string targetStorageIds = 17;
}
message OpTransferBlockProto {
required ClientOperationHeaderProto header = 1;
repeated DatanodeInfoProto targets = 2;
repeated StorageTypeProto targetStorageTypes = 3;
+ repeated string targetStorageIds = 4;
}
message OpReplaceBlockProto {
@@ -138,6 +141,7 @@ message OpReplaceBlockProto {
required string delHint = 2;
required DatanodeInfoProto source = 3;
optional StorageTypeProto storageType = 4 [default = DISK];
+ optional string storageId = 5;
}
message OpCopyBlockProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 3e27427..08ed3c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -570,4 +570,5 @@ message BlockTokenSecretProto {
optional uint64 blockId = 5;
repeated AccessModeProto modes = 6;
repeated StorageTypeProto storageTypes = 7;
+ repeated string storageIds = 8;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index 08ab967..bab2e8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -25,7 +25,9 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
@@ -185,7 +187,9 @@ public abstract class Receiver implements DataTransferProtocol {
CachingStrategy.newDefaultStrategy()),
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
(proto.hasPinning() ? proto.getPinning(): false),
- (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())));
+ (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())),
+ proto.getStorageId(),
+ proto.getTargetStorageIdsList().toArray(new String[0]));
} finally {
if (traceScope != null) traceScope.close();
}
@@ -199,11 +203,18 @@ public abstract class Receiver implements DataTransferProtocol {
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
- transferBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
+ final ExtendedBlock block =
+ PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock());
+ final StorageType[] targetStorageTypes =
+ PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(),
+ targets.length);
+ transferBlock(block,
PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
targets,
- PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
+ targetStorageTypes,
+ proto.getTargetStorageIdsList().toArray(new String[0])
+ );
} finally {
if (traceScope != null) traceScope.close();
}
@@ -264,7 +275,8 @@ public abstract class Receiver implements DataTransferProtocol {
PBHelperClient.convertStorageType(proto.getStorageType()),
PBHelperClient.convert(proto.getHeader().getToken()),
proto.getDelHint(),
- PBHelperClient.convert(proto.getSource()));
+ PBHelperClient.convert(proto.getSource()),
+ proto.getStorageId());
} finally {
if (traceScope != null) traceScope.close();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
index 29fb73f..8400b4f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
@@ -84,25 +84,27 @@ public class BlockPoolTokenSecretManager extends
/**
* See {@link BlockTokenSecretManager#checkAccess(BlockTokenIdentifier,
* String, ExtendedBlock, BlockTokenIdentifier.AccessMode,
- * StorageType[])}
+ * StorageType[], String[])}
*/
public void checkAccess(BlockTokenIdentifier id, String userId,
ExtendedBlock block, AccessMode mode,
- StorageType[] storageTypes) throws InvalidToken {
+ StorageType[] storageTypes, String[] storageIds)
+ throws InvalidToken {
get(block.getBlockPoolId()).checkAccess(id, userId, block, mode,
- storageTypes);
+ storageTypes, storageIds);
}
/**
* See {@link BlockTokenSecretManager#checkAccess(Token, String,
* ExtendedBlock, BlockTokenIdentifier.AccessMode,
- * StorageType[])}.
+ * StorageType[], String[])}
*/
public void checkAccess(Token<BlockTokenIdentifier> token,
String userId, ExtendedBlock block, AccessMode mode,
- StorageType[] storageTypes) throws InvalidToken {
+ StorageType[] storageTypes, String[] storageIds)
+ throws InvalidToken {
get(block.getBlockPoolId()).checkAccess(token, userId, block, mode,
- storageTypes);
+ storageTypes, storageIds);
}
/**
@@ -115,11 +117,13 @@ public class BlockPoolTokenSecretManager extends
/**
* See {@link BlockTokenSecretManager#generateToken(ExtendedBlock, EnumSet,
- * StorageType[])}
+ * StorageType[], String[])}.
*/
public Token<BlockTokenIdentifier> generateToken(ExtendedBlock b,
- EnumSet<AccessMode> of, StorageType[] storageTypes) throws IOException {
- return get(b.getBlockPoolId()).generateToken(b, of, storageTypes);
+ EnumSet<AccessMode> of, StorageType[] storageTypes, String[] storageIds)
+ throws IOException {
+ return get(b.getBlockPoolId()).generateToken(b, of, storageTypes,
+ storageIds);
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index f3bec83..6b54490 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -247,18 +247,19 @@ public class BlockTokenSecretManager extends
/** Generate an block token for current user */
public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
EnumSet<BlockTokenIdentifier.AccessMode> modes,
- StorageType[] storageTypes) throws IOException {
+ StorageType[] storageTypes, String[] storageIds) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String userID = (ugi == null ? null : ugi.getShortUserName());
- return generateToken(userID, block, modes, storageTypes);
+ return generateToken(userID, block, modes, storageTypes, storageIds);
}
/** Generate a block token for a specified user */
public Token<BlockTokenIdentifier> generateToken(String userId,
ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes,
- StorageType[] storageTypes) throws IOException {
+ StorageType[] storageTypes, String[] storageIds) throws IOException {
BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
- .getBlockPoolId(), block.getBlockId(), modes, storageTypes, useProto);
+ .getBlockPoolId(), block.getBlockId(), modes, storageTypes,
+ storageIds, useProto);
return new Token<BlockTokenIdentifier>(id, this);
}
@@ -272,10 +273,13 @@ public class BlockTokenSecretManager extends
*/
public void checkAccess(BlockTokenIdentifier id, String userId,
ExtendedBlock block, BlockTokenIdentifier.AccessMode mode,
- StorageType[] storageTypes) throws InvalidToken {
+ StorageType[] storageTypes, String[] storageIds) throws InvalidToken {
checkAccess(id, userId, block, mode);
if (storageTypes != null && storageTypes.length > 0) {
- checkAccess(id.getStorageTypes(), storageTypes);
+ checkAccess(id.getStorageTypes(), storageTypes, "StorageTypes");
+ }
+ if (storageIds != null && storageIds.length > 0) {
+ checkAccess(id.getStorageIds(), storageIds, "StorageIDs");
}
}
@@ -309,30 +313,31 @@ public class BlockTokenSecretManager extends
}
/**
- * Check if the requested StorageTypes match the StorageTypes in the
- * BlockTokenIdentifier.
- * Empty candidateStorageTypes specifiers mean 'all is permitted'. They
- * would otherwise be nonsensical.
+ * Check if the requested values can be satisfied with the values in the
+ * BlockToken. This is intended for use with StorageTypes and StorageIDs.
+ *
+ * The current node can only verify that one of the storage [Type|ID] is
+ * available. The rest will be on different nodes.
*/
- public static void checkAccess(StorageType[] candidateStorageTypes,
- StorageType[] storageTypesRequested) throws InvalidToken {
- if (storageTypesRequested.length == 0) {
- throw new InvalidToken("The request has no StorageTypes. "
+ public static <T> void checkAccess(T[] candidates, T[] requested, String msg)
+ throws InvalidToken {
+ if (requested.length == 0) {
+ throw new InvalidToken("The request has no " + msg + ". "
+ "This is probably a configuration error.");
}
- if (candidateStorageTypes.length == 0) {
+ if (candidates.length == 0) {
return;
}
- List<StorageType> unseenCandidates = new ArrayList<StorageType>();
- unseenCandidates.addAll(Arrays.asList(candidateStorageTypes));
- for (StorageType storageType : storageTypesRequested) {
- final int index = unseenCandidates.indexOf(storageType);
+ List unseenCandidates = new ArrayList<T>();
+ unseenCandidates.addAll(Arrays.asList(candidates));
+ for (T req : requested) {
+ final int index = unseenCandidates.indexOf(req);
if (index == -1) {
- throw new InvalidToken("Block token with StorageTypes "
- + Arrays.toString(candidateStorageTypes)
- + " not valid for access with StorageTypes "
- + Arrays.toString(storageTypesRequested));
+ throw new InvalidToken("Block token with " + msg + " "
+ + Arrays.toString(candidates)
+ + " not valid for access with " + msg + " "
+ + Arrays.toString(requested));
}
Collections.swap(unseenCandidates, index, unseenCandidates.size()-1);
unseenCandidates.remove(unseenCandidates.size()-1);
@@ -342,7 +347,7 @@ public class BlockTokenSecretManager extends
/** Check if access should be allowed. userID is not checked if null */
public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
ExtendedBlock block, BlockTokenIdentifier.AccessMode mode,
- StorageType[] storageTypes) throws InvalidToken {
+ StorageType[] storageTypes, String[] storageIds) throws InvalidToken {
BlockTokenIdentifier id = new BlockTokenIdentifier();
try {
id.readFields(new DataInputStream(new ByteArrayInputStream(token
@@ -352,7 +357,7 @@ public class BlockTokenSecretManager extends
"Unable to de-serialize block token identifier for user=" + userId
+ ", block=" + block + ", access mode=" + mode);
}
- checkAccess(id, userId, block, mode, storageTypes);
+ checkAccess(id, userId, block, mode, storageTypes, storageIds);
if (!Arrays.equals(retrievePassword(id), token.getPassword())) {
throw new InvalidToken("Block token with " + id.toString()
+ " doesn't have the correct token password");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 91dc907..f855e45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -357,7 +357,7 @@ public class Dispatcher {
reportedBlock.getBlock());
final KeyManager km = nnc.getKeyManager();
Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb,
- new StorageType[]{target.storageType});
+ new StorageType[]{target.storageType}, new String[0]);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, km, accessToken, target.getDatanodeInfo());
unbufOut = saslStreams.out;
@@ -411,7 +411,8 @@ public class Dispatcher {
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
Token<BlockTokenIdentifier> accessToken) throws IOException {
new Sender(out).replaceBlock(eb, target.storageType, accessToken,
- source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
+ source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode,
+ null);
}
/** Check whether to continue waiting for response */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
index 06bf07f..faf95b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
@@ -95,7 +95,7 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory {
/** Get an access token for a block. */
public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb,
- StorageType[] storageTypes) throws IOException {
+ StorageType[] storageTypes, String[] storageIds) throws IOException {
if (!isBlockTokenEnabled) {
return BlockTokenSecretManager.DUMMY_TOKEN;
} else {
@@ -105,7 +105,7 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory {
}
return blockTokenSecretManager.generateToken(null, eb,
EnumSet.of(BlockTokenIdentifier.AccessMode.REPLACE,
- BlockTokenIdentifier.AccessMode.COPY), storageTypes);
+ BlockTokenIdentifier.AccessMode.COPY), storageTypes, storageIds);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index e63930a..8f58e25 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1283,13 +1283,15 @@ public class BlockManager implements BlockStatsMXBean {
internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]);
blockTokens[i] = blockTokenSecretManager.generateToken(
NameNode.getRemoteUser().getShortUserName(),
- internalBlock, EnumSet.of(mode), b.getStorageTypes());
+ internalBlock, EnumSet.of(mode), b.getStorageTypes(),
+ b.getStorageIDs());
}
sb.setBlockTokens(blockTokens);
} else {
b.setBlockToken(blockTokenSecretManager.generateToken(
NameNode.getRemoteUser().getShortUserName(),
- b.getBlock(), EnumSet.of(mode), b.getStorageTypes()));
+ b.getBlock(), EnumSet.of(mode), b.getStorageTypes(),
+ b.getStorageIDs()));
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index e0daca7..042169a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -679,7 +679,8 @@ class BPOfferService {
case DatanodeProtocol.DNA_TRANSFER:
// Send a copy of a block to another datanode
dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
- bcmd.getTargets(), bcmd.getTargetStorageTypes());
+ bcmd.getTargets(), bcmd.getTargetStorageTypes(),
+ bcmd.getTargetStorageIDs());
break;
case DatanodeProtocol.DNA_INVALIDATE:
//
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 00109e0..2ab4067 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -151,7 +151,8 @@ class BlockReceiver implements Closeable {
final DataNode datanode, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist,
- final boolean pinning) throws IOException {
+ final boolean pinning,
+ final String storageId) throws IOException {
try{
this.block = block;
this.in = in;
@@ -197,6 +198,7 @@ class BlockReceiver implements Closeable {
+ "\n allowLazyPersist=" + allowLazyPersist + ", pinning=" + pinning
+ ", isClient=" + isClient + ", isDatanode=" + isDatanode
+ ", responseInterval=" + responseInterval
+ + ", storageID=" + (storageId != null ? storageId : "null")
);
}
@@ -204,11 +206,13 @@ class BlockReceiver implements Closeable {
// Open local disk out
//
if (isDatanode) { //replication or move
- replicaHandler = datanode.data.createTemporary(storageType, block);
+ replicaHandler =
+ datanode.data.createTemporary(storageType, storageId, block);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
- replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist);
+ replicaHandler = datanode.data.createRbw(storageType, storageId,
+ block, allowLazyPersist);
datanode.notifyNamenodeReceivingBlock(
block, replicaHandler.getReplica().getStorageUuid());
break;
@@ -233,7 +237,7 @@ class BlockReceiver implements Closeable {
case TRANSFER_FINALIZED:
// this is a transfer destination
replicaHandler =
- datanode.data.createTemporary(storageType, block);
+ datanode.data.createTemporary(storageType, storageId, block);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 66ef89a..2305e0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1943,7 +1943,7 @@ public class DataNode extends ReconfigurableBase
LOG.debug("Got: " + id.toString());
}
blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode,
- null);
+ null, null);
}
}
@@ -2224,7 +2224,8 @@ public class DataNode extends ReconfigurableBase
@VisibleForTesting
void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
- StorageType[] xferTargetStorageTypes) throws IOException {
+ StorageType[] xferTargetStorageTypes, String[] xferTargetStorageIDs)
+ throws IOException {
BPOfferService bpos = getBPOSForBlock(block);
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
@@ -2281,17 +2282,19 @@ public class DataNode extends ReconfigurableBase
LOG.info(bpReg + " Starting thread to transfer " +
block + " to " + xfersBuilder);
- new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block,
+ new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes,
+ xferTargetStorageIDs, block,
BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
}
}
void transferBlocks(String poolId, Block blocks[],
- DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) {
+ DatanodeInfo[][] xferTargets, StorageType[][] xferTargetStorageTypes,
+ String[][] xferTargetStorageIDs) {
for (int i = 0; i < blocks.length; i++) {
try {
transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i],
- xferTargetStorageTypes[i]);
+ xferTargetStorageTypes[i], xferTargetStorageIDs[i]);
} catch (IOException ie) {
LOG.warn("Failed to transfer block " + blocks[i], ie);
}
@@ -2395,6 +2398,7 @@ public class DataNode extends ReconfigurableBase
private class DataTransfer implements Runnable {
final DatanodeInfo[] targets;
final StorageType[] targetStorageTypes;
+ final private String[] targetStorageIds;
final ExtendedBlock b;
final BlockConstructionStage stage;
final private DatanodeRegistration bpReg;
@@ -2406,8 +2410,8 @@ public class DataNode extends ReconfigurableBase
* entire target list, the block, and the data.
*/
DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
- ExtendedBlock b, BlockConstructionStage stage,
- final String clientname) {
+ String[] targetStorageIds, ExtendedBlock b,
+ BlockConstructionStage stage, final String clientname) {
if (DataTransferProtocol.LOG.isDebugEnabled()) {
DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
+ b + " (numBytes=" + b.getNumBytes() + ")"
@@ -2415,10 +2419,13 @@ public class DataNode extends ReconfigurableBase
+ ", clientname=" + clientname
+ ", targets=" + Arrays.asList(targets)
+ ", target storage types=" + (targetStorageTypes == null ? "[]" :
- Arrays.asList(targetStorageTypes)));
+ Arrays.asList(targetStorageTypes))
+ + ", target storage IDs=" + (targetStorageIds == null ? "[]" :
+ Arrays.asList(targetStorageIds)));
}
this.targets = targets;
this.targetStorageTypes = targetStorageTypes;
+ this.targetStorageIds = targetStorageIds;
this.b = b;
this.stage = stage;
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
@@ -2456,7 +2463,7 @@ public class DataNode extends ReconfigurableBase
//
Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
- targetStorageTypes);
+ targetStorageTypes, targetStorageIds);
long writeTimeout = dnConf.socketWriteTimeout +
HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
@@ -2477,10 +2484,13 @@ public class DataNode extends ReconfigurableBase
DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg)
.build();
+ String storageId = targetStorageIds.length > 0 ?
+ targetStorageIds[0] : null;
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
clientname, targets, targetStorageTypes, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
- false, false, null);
+ false, false, null, storageId,
+ targetStorageIds);
// send data & checksum
blockSender.sendBlock(out, unbufOut, null);
@@ -2540,12 +2550,12 @@ public class DataNode extends ReconfigurableBase
*/
public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b,
EnumSet<AccessMode> mode,
- StorageType[] storageTypes) throws IOException {
+ StorageType[] storageTypes, String[] storageIds) throws IOException {
Token<BlockTokenIdentifier> accessToken =
BlockTokenSecretManager.DUMMY_TOKEN;
if (isBlockTokenEnabled) {
accessToken = blockPoolTokenSecretManager.generateToken(b, mode,
- storageTypes);
+ storageTypes, storageIds);
}
return accessToken;
}
@@ -2918,7 +2928,7 @@ public class DataNode extends ReconfigurableBase
LOG.debug("Got: " + id.toString());
}
blockPoolTokenSecretManager.checkAccess(id, null, block,
- BlockTokenIdentifier.AccessMode.READ, null);
+ BlockTokenIdentifier.AccessMode.READ, null, null);
}
}
}
@@ -2934,7 +2944,8 @@ public class DataNode extends ReconfigurableBase
*/
void transferReplicaForPipelineRecovery(final ExtendedBlock b,
final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
- final String client) throws IOException {
+ final String[] targetStorageIds, final String client)
+ throws IOException {
final long storedGS;
final long visible;
final BlockConstructionStage stage;
@@ -2967,7 +2978,8 @@ public class DataNode extends ReconfigurableBase
b.setNumBytes(visible);
if (targets.length > 0) {
- new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
+ new DataTransfer(targets, targetStorageTypes, targetStorageIds, b, stage,
+ client).run();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index cc13799..d42e330 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -354,7 +354,8 @@ class DataXceiver extends Receiver implements Runnable {
updateCurrentThreadName("Passing file descriptors for block " + blk);
DataOutputStream out = getBufferedOutputStream();
checkAccess(out, true, blk, token,
- Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ);
+ Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ,
+ null, null);
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
FileInputStream fis[] = null;
SlotId registeredSlotId = null;
@@ -662,7 +663,7 @@ class DataXceiver extends Receiver implements Runnable {
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
- final StorageType[] targetStorageTypes,
+ final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
@@ -673,7 +674,9 @@ class DataXceiver extends Receiver implements Runnable {
CachingStrategy cachingStrategy,
boolean allowLazyPersist,
final boolean pinning,
- final boolean[] targetPinnings) throws IOException {
+ final boolean[] targetPinnings,
+ final String storageId,
+ final String[] targetStorageIds) throws IOException {
previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0;
@@ -692,8 +695,15 @@ class DataXceiver extends Receiver implements Runnable {
if (targetStorageTypes.length > 0) {
System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst);
}
+ int nsi = targetStorageIds.length;
+ String[] storageIds = new String[nsi + 1];
+ storageIds[0] = storageId;
+ if (targetStorageTypes.length > 0) {
+ System.arraycopy(targetStorageIds, 0, storageIds, 1, nsi);
+ }
checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK,
- BlockTokenIdentifier.AccessMode.WRITE, storageTypes);
+ BlockTokenIdentifier.AccessMode.WRITE,
+ storageTypes, storageIds);
// check single target for transfer-RBW/Finalized
if (isTransfer && targets.length > 0) {
@@ -743,7 +753,7 @@ class DataXceiver extends Receiver implements Runnable {
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
- cachingStrategy, allowLazyPersist, pinning));
+ cachingStrategy, allowLazyPersist, pinning, storageId));
replica = blockReceiver.getReplica();
} else {
replica = datanode.data.recoverClose(
@@ -796,16 +806,18 @@ class DataXceiver extends Receiver implements Runnable {
if (targetPinnings != null && targetPinnings.length > 0) {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
- blockToken, clientname, targets, targetStorageTypes, srcDataNode,
- stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
- latestGenerationStamp, requestedChecksum, cachingStrategy,
- allowLazyPersist, targetPinnings[0], targetPinnings);
+ blockToken, clientname, targets, targetStorageTypes,
+ srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+ latestGenerationStamp, requestedChecksum, cachingStrategy,
+ allowLazyPersist, targetPinnings[0], targetPinnings,
+ targetStorageIds[0], targetStorageIds);
} else {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
- blockToken, clientname, targets, targetStorageTypes, srcDataNode,
- stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
- latestGenerationStamp, requestedChecksum, cachingStrategy,
- allowLazyPersist, false, targetPinnings);
+ blockToken, clientname, targets, targetStorageTypes,
+ srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+ latestGenerationStamp, requestedChecksum, cachingStrategy,
+ allowLazyPersist, false, targetPinnings,
+ targetStorageIds[0], targetStorageIds);
}
mirrorOut.flush();
@@ -929,17 +941,19 @@ class DataXceiver extends Receiver implements Runnable {
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
- final StorageType[] targetStorageTypes) throws IOException {
+ final StorageType[] targetStorageTypes,
+ final String[] targetStorageIds) throws IOException {
previousOpClientName = clientName;
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
final DataOutputStream out = new DataOutputStream(
getOutputStream());
checkAccess(out, true, blk, blockToken, Op.TRANSFER_BLOCK,
- BlockTokenIdentifier.AccessMode.COPY, targetStorageTypes);
+ BlockTokenIdentifier.AccessMode.COPY, targetStorageTypes,
+ targetStorageIds);
try {
datanode.transferReplicaForPipelineRecovery(blk, targets,
- targetStorageTypes, clientName);
+ targetStorageTypes, targetStorageIds, clientName);
writeResponse(Status.SUCCESS, null, out);
} catch (IOException ioe) {
LOG.info("transferBlock " + blk + " received exception " + ioe);
@@ -1105,12 +1119,14 @@ class DataXceiver extends Receiver implements Runnable {
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
- final DatanodeInfo proxySource) throws IOException {
+ final DatanodeInfo proxySource,
+ final String storageId) throws IOException {
updateCurrentThreadName("Replacing block " + block + " from " + delHint);
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
checkAccess(replyOut, true, block, blockToken,
Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE,
- new StorageType[]{ storageType });
+ new StorageType[]{storageType},
+ new String[]{storageId});
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
String msg = "Not able to receive block " + block.getBlockId() +
@@ -1131,7 +1147,7 @@ class DataXceiver extends Receiver implements Runnable {
// Move the block to different storage in the same datanode
if (proxySource.equals(datanode.getDatanodeId())) {
ReplicaInfo oldReplica = datanode.data.moveBlockAcrossStorage(block,
- storageType);
+ storageType, storageId);
if (oldReplica != null) {
LOG.info("Moved " + block + " from StorageType "
+ oldReplica.getVolume().getStorageType() + " to " + storageType);
@@ -1188,7 +1204,7 @@ class DataXceiver extends Receiver implements Runnable {
proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
- CachingStrategy.newDropBehind(), false, false));
+ CachingStrategy.newDropBehind(), false, false, storageId));
// receive a block
blockReceiver.receiveBlock(null, null, replyOut, null,
@@ -1258,11 +1274,12 @@ class DataXceiver extends Receiver implements Runnable {
final DataNode dn, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist,
- final boolean pinning) throws IOException {
+ final boolean pinning,
+ final String storageId) throws IOException {
return new BlockReceiver(block, storageType, in,
inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, dn, requestedChecksum,
- cachingStrategy, allowLazyPersist, pinning);
+ cachingStrategy, allowLazyPersist, pinning, storageId);
}
/**
@@ -1365,7 +1382,7 @@ class DataXceiver extends Receiver implements Runnable {
private void checkAccess(OutputStream out, final boolean reply,
ExtendedBlock blk, Token<BlockTokenIdentifier> t, Op op,
BlockTokenIdentifier.AccessMode mode) throws IOException {
- checkAccess(out, reply, blk, t, op, mode, null);
+ checkAccess(out, reply, blk, t, op, mode, null, null);
}
private void checkAccess(OutputStream out, final boolean reply,
@@ -1373,7 +1390,8 @@ class DataXceiver extends Receiver implements Runnable {
final Token<BlockTokenIdentifier> t,
final Op op,
final BlockTokenIdentifier.AccessMode mode,
- final StorageType[] storageTypes) throws IOException {
+ final StorageType[] storageTypes,
+ final String[] storageIds) throws IOException {
checkAndWaitForBP(blk);
if (datanode.isBlockTokenEnabled) {
if (LOG.isDebugEnabled()) {
@@ -1382,7 +1400,7 @@ class DataXceiver extends Receiver implements Runnable {
}
try {
datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode,
- storageTypes);
+ storageTypes, storageIds);
} catch(InvalidToken e) {
try {
if (reply) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 1492e5d..e076dda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -111,7 +111,8 @@ public final class ErasureCodingWorker {
new StripedReconstructionInfo(
reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(),
reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),
- reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes());
+ reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(),
+ reconInfo.getTargetStorageIDs());
final StripedBlockReconstructor task =
new StripedBlockReconstructor(this, stripedReconInfo);
if (task.hasValidTargets()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
index b3884c2..39ef67e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
@@ -110,7 +110,7 @@ class StripedBlockReader {
stripedReader.getSocketAddress4Transfer(source);
Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ),
- StorageType.EMPTY_ARRAY);
+ StorageType.EMPTY_ARRAY, new String[0]);
/*
* This can be further improved if the replica is local, then we can
* read directly from DN and need to check the replica is FINALIZED
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
index a6989d4..24c1d61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
@@ -61,6 +61,7 @@ class StripedBlockWriter {
private final ExtendedBlock block;
private final DatanodeInfo target;
private final StorageType storageType;
+ private final String storageId;
private Socket targetSocket;
private DataOutputStream targetOutputStream;
@@ -72,8 +73,8 @@ class StripedBlockWriter {
StripedBlockWriter(StripedWriter stripedWriter, DataNode datanode,
Configuration conf, ExtendedBlock block,
- DatanodeInfo target, StorageType storageType)
- throws IOException {
+ DatanodeInfo target, StorageType storageType,
+ String storageId) throws IOException {
this.stripedWriter = stripedWriter;
this.datanode = datanode;
this.conf = conf;
@@ -81,6 +82,7 @@ class StripedBlockWriter {
this.block = block;
this.target = target;
this.storageType = storageType;
+ this.storageId = storageId;
this.targetBuffer = stripedWriter.allocateWriteBuffer();
@@ -117,7 +119,7 @@ class StripedBlockWriter {
Token<BlockTokenIdentifier> blockToken =
datanode.getBlockAccessToken(block,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
- new StorageType[]{storageType});
+ new StorageType[]{storageType}, new String[]{storageId});
long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout);
@@ -141,7 +143,7 @@ class StripedBlockWriter {
new StorageType[]{storageType}, source,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0,
stripedWriter.getChecksum(), stripedWriter.getCachingStrategy(),
- false, false, null);
+ false, false, null, storageId, new String[]{storageId});
targetSocket = socket;
targetOutputStream = out;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
index a5c328b..a619c34 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
@@ -40,24 +40,27 @@ public class StripedReconstructionInfo {
private final byte[] targetIndices;
private final DatanodeInfo[] targets;
private final StorageType[] targetStorageTypes;
+ private final String[] targetStorageIds;
public StripedReconstructionInfo(ExtendedBlock blockGroup,
ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
byte[] targetIndices) {
- this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null, null);
+ this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null,
+ null, null);
}
StripedReconstructionInfo(ExtendedBlock blockGroup,
ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
- DatanodeInfo[] targets, StorageType[] targetStorageTypes) {
+ DatanodeInfo[] targets, StorageType[] targetStorageTypes,
+ String[] targetStorageIds) {
this(blockGroup, ecPolicy, liveIndices, sources, null, targets,
- targetStorageTypes);
+ targetStorageTypes, targetStorageIds);
}
private StripedReconstructionInfo(ExtendedBlock blockGroup,
ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
byte[] targetIndices, DatanodeInfo[] targets,
- StorageType[] targetStorageTypes) {
+ StorageType[] targetStorageTypes, String[] targetStorageIds) {
this.blockGroup = blockGroup;
this.ecPolicy = ecPolicy;
@@ -66,6 +69,7 @@ public class StripedReconstructionInfo {
this.targetIndices = targetIndices;
this.targets = targets;
this.targetStorageTypes = targetStorageTypes;
+ this.targetStorageIds = targetStorageIds;
}
ExtendedBlock getBlockGroup() {
@@ -95,5 +99,9 @@ public class StripedReconstructionInfo {
StorageType[] getTargetStorageTypes() {
return targetStorageTypes;
}
+
+ String[] getTargetStorageIds() {
+ return targetStorageIds;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
index 225a7ed..762506c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
@@ -55,6 +55,7 @@ class StripedWriter {
private final short[] targetIndices;
private boolean hasValidTargets;
private final StorageType[] targetStorageTypes;
+ private final String[] targetStorageIds;
private StripedBlockWriter[] writers;
@@ -77,6 +78,8 @@ class StripedWriter {
assert targets != null;
this.targetStorageTypes = stripedReconInfo.getTargetStorageTypes();
assert targetStorageTypes != null;
+ this.targetStorageIds = stripedReconInfo.getTargetStorageIds();
+ assert targetStorageIds != null;
writers = new StripedBlockWriter[targets.length];
@@ -192,7 +195,7 @@ class StripedWriter {
private StripedBlockWriter createWriter(short index) throws IOException {
return new StripedBlockWriter(this, datanode, conf,
reconstructor.getBlock(targetIndices[index]), targets[index],
- targetStorageTypes[index]);
+ targetStorageTypes[index], targetStorageIds[index]);
}
ByteBuffer allocateWriteBuffer() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
index 39d9547..efe222f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
@@ -113,8 +113,8 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
new RoundRobinVolumeChoosingPolicy<V>();
@Override
- public V chooseVolume(List<V> volumes,
- long replicaSize) throws IOException {
+ public V chooseVolume(List<V> volumes, long replicaSize, String storageId)
+ throws IOException {
if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}
@@ -125,19 +125,20 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
storageType.ordinal() : StorageType.DEFAULT.ordinal();
synchronized (syncLocks[index]) {
- return doChooseVolume(volumes, replicaSize);
+ return doChooseVolume(volumes, replicaSize, storageId);
}
}
- private V doChooseVolume(final List<V> volumes,
- long replicaSize) throws IOException {
+ private V doChooseVolume(final List<V> volumes, long replicaSize,
+ String storageId) throws IOException {
AvailableSpaceVolumeList volumesWithSpaces =
new AvailableSpaceVolumeList(volumes);
if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) {
// If they're actually not too far out of whack, fall back on pure round
// robin.
- V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize);
+ V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize,
+ storageId);
if (LOG.isDebugEnabled()) {
LOG.debug("All volumes are within the configured free space balance " +
"threshold. Selecting " + volume + " for write of block size " +
@@ -165,7 +166,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
if (mostAvailableAmongLowVolumes < replicaSize ||
random.nextFloat() < scaledPreferencePercent) {
volume = roundRobinPolicyHighAvailable.chooseVolume(
- highAvailableVolumes, replicaSize);
+ highAvailableVolumes, replicaSize, storageId);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from high available space volumes for write of block size "
@@ -173,7 +174,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
}
} else {
volume = roundRobinPolicyLowAvailable.chooseVolume(
- lowAvailableVolumes, replicaSize);
+ lowAvailableVolumes, replicaSize, storageId);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from low available space volumes for write of block size "
@@ -266,7 +267,8 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
/**
* Used so that we only check the available space on a given volume once, at
- * the beginning of {@link AvailableSpaceVolumeChoosingPolicy#chooseVolume(List, long)}.
+ * the beginning of
+ * {@link AvailableSpaceVolumeChoosingPolicy#chooseVolume}.
*/
private class AvailableSpaceVolumePair {
private final V volume;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 9e979f7..d7e29cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -318,7 +318,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- ReplicaHandler createTemporary(StorageType storageType,
+ ReplicaHandler createTemporary(StorageType storageType, String storageId,
ExtendedBlock b) throws IOException;
/**
@@ -328,7 +328,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- ReplicaHandler createRbw(StorageType storageType,
+ ReplicaHandler createRbw(StorageType storageType, String storageId,
ExtendedBlock b, boolean allowLazyPersist) throws IOException;
/**
@@ -623,7 +623,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* Move block from one storage to another storage
*/
ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
- StorageType targetStorageType) throws IOException;
+ StorageType targetStorageType, String storageId) throws IOException;
/**
* Set a block to be pinned on this datanode so that it cannot be moved
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
index 9474b92..b9bcf1f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
@@ -50,7 +50,7 @@ public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
}
@Override
- public V chooseVolume(final List<V> volumes, long blockSize)
+ public V chooseVolume(final List<V> volumes, long blockSize, String storageId)
throws IOException {
if (volumes.size() < 1) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java
index 62b1e75..8cbc058 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java
@@ -36,8 +36,11 @@ public interface VolumeChoosingPolicy<V extends FsVolumeSpi> {
*
* @param volumes - a list of available volumes.
* @param replicaSize - the size of the replica for which a volume is sought.
+ * @param storageId - the storage id of the Volume nominated by the namenode.
+ * This can usually be ignored by the VolumeChoosingPolicy.
* @return the chosen volume.
* @throws IOException when disks are unavailable or are full.
*/
- public V chooseVolume(List<V> volumes, long replicaSize) throws IOException;
+ V chooseVolume(List<V> volumes, long replicaSize, String storageId)
+ throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 169e0e6..9a5002a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -927,7 +927,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
@Override
public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
- StorageType targetStorageType) throws IOException {
+ StorageType targetStorageType, String targetStorageId)
+ throws IOException {
ReplicaInfo replicaInfo = getReplicaInfo(block);
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new ReplicaNotFoundException(
@@ -952,7 +953,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeReference volumeRef = null;
try (AutoCloseableLock lock = datasetLock.acquire()) {
- volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
+ volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
+ block.getNumBytes());
}
try {
moveBlock(block, replicaInfo, volumeRef);
@@ -1298,11 +1300,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
}
-
+
@Override // FsDatasetSpi
public ReplicaHandler createRbw(
- StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
- throws IOException {
+ StorageType storageType, String storageId, ExtendedBlock b,
+ boolean allowLazyPersist) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
@@ -1335,7 +1337,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
if (ref == null) {
- ref = volumes.getNextVolume(storageType, b.getNumBytes());
+ ref = volumes.getNextVolume(storageType, storageId, b.getNumBytes());
}
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
@@ -1503,7 +1505,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public ReplicaHandler createTemporary(
- StorageType storageType, ExtendedBlock b) throws IOException {
+ StorageType storageType, String storageId, ExtendedBlock b)
+ throws IOException {
long startTimeMs = Time.monotonicNow();
long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
ReplicaInfo lastFoundReplicaInfo = null;
@@ -1516,7 +1519,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
}
FsVolumeReference ref =
- volumes.getNextVolume(storageType, b.getNumBytes());
+ volumes.getNextVolume(storageType, storageId, b.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
ReplicaInPipeline newReplicaInfo;
try {
@@ -2899,7 +2902,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
replicaInfo.getVolume().isTransientStorage()) {
// Pick a target volume to persist the block.
targetReference = volumes.getNextVolume(
- StorageType.DEFAULT, replicaInfo.getNumBytes());
+ StorageType.DEFAULT, null, replicaInfo.getNumBytes());
targetVolume = (FsVolumeImpl) targetReference.getVolume();
ramDiskReplicaTracker.recordStartLazyPersist(
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org