You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/12/19 03:28:27 UTC
[18/51] [abbrv] hadoop git commit: HDFS-12778. [READ] Report multiple
locations for PROVIDED blocks
HDFS-12778. [READ] Report multiple locations for PROVIDED blocks
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3d3be87e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3d3be87e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3d3be87e
Branch: refs/heads/HDFS-7240
Commit: 3d3be87e301d9f8ab1a220bc5dbeae0f032c5a86
Parents: 3b1d303
Author: Virajith Jalaparti <vi...@apache.org>
Authored: Tue Nov 21 14:54:57 2017 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Dec 15 17:51:39 2017 -0800
----------------------------------------------------------------------
.../blockmanagement/ProvidedStorageMap.java | 149 +++++++------------
.../server/namenode/FixedBlockResolver.java | 3 +-
.../TestNameNodeProvidedImplementation.java | 127 +++++++++++-----
3 files changed, 151 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d3be87e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
index 2bc8faa..6fec977 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -72,6 +71,7 @@ public class ProvidedStorageMap {
private final DatanodeStorageInfo providedStorageInfo;
private boolean providedEnabled;
private long capacity;
+ private int defaultReplication;
ProvidedStorageMap(RwLock lock, BlockManager bm, Configuration conf)
throws IOException {
@@ -95,6 +95,8 @@ public class ProvidedStorageMap {
storageId, State.NORMAL, StorageType.PROVIDED);
providedDescriptor = new ProvidedDescriptor();
providedStorageInfo = providedDescriptor.createProvidedStorage(ds);
+ this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT);
this.bm = bm;
this.lock = lock;
@@ -198,63 +200,72 @@ public class ProvidedStorageMap {
*/
class ProvidedBlocksBuilder extends LocatedBlockBuilder {
- private ShadowDatanodeInfoWithStorage pending;
- private boolean hasProvidedLocations;
-
ProvidedBlocksBuilder(int maxBlocks) {
super(maxBlocks);
- pending = new ShadowDatanodeInfoWithStorage(
- providedDescriptor, storageId);
- hasProvidedLocations = false;
+ }
+
+ private DatanodeDescriptor chooseProvidedDatanode(
+ Set<String> excludedUUids) {
+ DatanodeDescriptor dn = providedDescriptor.choose(null, excludedUUids);
+ if (dn == null) {
+ dn = providedDescriptor.choose(null);
+ }
+ return dn;
}
@Override
LocatedBlock newLocatedBlock(ExtendedBlock eb,
DatanodeStorageInfo[] storages, long pos, boolean isCorrupt) {
- DatanodeInfoWithStorage[] locs =
- new DatanodeInfoWithStorage[storages.length];
- String[] sids = new String[storages.length];
- StorageType[] types = new StorageType[storages.length];
+ List<DatanodeInfoWithStorage> locs = new ArrayList<>();
+ List<String> sids = new ArrayList<>();
+ List<StorageType> types = new ArrayList<>();
+ boolean isProvidedBlock = false;
+ Set<String> excludedUUids = new HashSet<>();
+
for (int i = 0; i < storages.length; ++i) {
- sids[i] = storages[i].getStorageID();
- types[i] = storages[i].getStorageType();
- if (StorageType.PROVIDED.equals(storages[i].getStorageType())) {
- locs[i] = pending;
- hasProvidedLocations = true;
+ DatanodeStorageInfo currInfo = storages[i];
+ StorageType storageType = currInfo.getStorageType();
+ sids.add(currInfo.getStorageID());
+ types.add(storageType);
+ if (StorageType.PROVIDED.equals(storageType)) {
+ DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids);
+ locs.add(
+ new DatanodeInfoWithStorage(
+ dn, currInfo.getStorageID(), currInfo.getStorageType()));
+ excludedUUids.add(dn.getDatanodeUuid());
+ isProvidedBlock = true;
} else {
- locs[i] = new DatanodeInfoWithStorage(
- storages[i].getDatanodeDescriptor(), sids[i], types[i]);
+ locs.add(new DatanodeInfoWithStorage(
+ currInfo.getDatanodeDescriptor(),
+ currInfo.getStorageID(), storageType));
+ excludedUUids.add(currInfo.getDatanodeDescriptor().getDatanodeUuid());
}
}
- return new LocatedBlock(eb, locs, sids, types, pos, isCorrupt, null);
- }
- @Override
- LocatedBlocks build(DatanodeDescriptor client) {
- // TODO: to support multiple provided storages, need to pass/maintain map
- if (hasProvidedLocations) {
- // set all fields of pending DatanodeInfo
- List<String> excludedUUids = new ArrayList<String>();
- for (LocatedBlock b : blocks) {
- DatanodeInfo[] infos = b.getLocations();
- StorageType[] types = b.getStorageTypes();
-
- for (int i = 0; i < types.length; i++) {
- if (!StorageType.PROVIDED.equals(types[i])) {
- excludedUUids.add(infos[i].getDatanodeUuid());
- }
- }
+ int numLocations = locs.size();
+ if (isProvidedBlock) {
+ // add more replicas until we reach the defaultReplication
+ for (int count = numLocations + 1;
+ count <= defaultReplication && count <= providedDescriptor
+ .activeProvidedDatanodes(); count++) {
+ DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids);
+ locs.add(new DatanodeInfoWithStorage(
+ dn, storageId, StorageType.PROVIDED));
+ sids.add(storageId);
+ types.add(StorageType.PROVIDED);
+ excludedUUids.add(dn.getDatanodeUuid());
}
-
- DatanodeDescriptor dn =
- providedDescriptor.choose(client, excludedUUids);
- if (dn == null) {
- dn = providedDescriptor.choose(client);
- }
- pending.replaceInternal(dn);
}
+ return new LocatedBlock(eb,
+ locs.toArray(new DatanodeInfoWithStorage[locs.size()]),
+ sids.toArray(new String[sids.size()]),
+ types.toArray(new StorageType[types.size()]),
+ pos, isCorrupt, null);
+ }
+ @Override
+ LocatedBlocks build(DatanodeDescriptor client) {
return new LocatedBlocks(
flen, isUC, blocks, last, lastComplete, feInfo, ecPolicy);
}
@@ -266,53 +277,6 @@ public class ProvidedStorageMap {
}
/**
- * An abstract {@link DatanodeInfoWithStorage} to represent provided storage.
- */
- static class ShadowDatanodeInfoWithStorage extends DatanodeInfoWithStorage {
- private String shadowUuid;
-
- ShadowDatanodeInfoWithStorage(DatanodeDescriptor d, String storageId) {
- super(d, storageId, StorageType.PROVIDED);
- }
-
- @Override
- public String getDatanodeUuid() {
- return shadowUuid;
- }
-
- public void setDatanodeUuid(String uuid) {
- shadowUuid = uuid;
- }
-
- void replaceInternal(DatanodeDescriptor dn) {
- updateRegInfo(dn); // overwrite DatanodeID (except UUID)
- setDatanodeUuid(dn.getDatanodeUuid());
- setCapacity(dn.getCapacity());
- setDfsUsed(dn.getDfsUsed());
- setRemaining(dn.getRemaining());
- setBlockPoolUsed(dn.getBlockPoolUsed());
- setCacheCapacity(dn.getCacheCapacity());
- setCacheUsed(dn.getCacheUsed());
- setLastUpdate(dn.getLastUpdate());
- setLastUpdateMonotonic(dn.getLastUpdateMonotonic());
- setXceiverCount(dn.getXceiverCount());
- setNetworkLocation(dn.getNetworkLocation());
- adminState = dn.getAdminState();
- setUpgradeDomain(dn.getUpgradeDomain());
- }
-
- @Override
- public boolean equals(Object obj) {
- return super.equals(obj);
- }
-
- @Override
- public int hashCode() {
- return super.hashCode();
- }
- }
-
- /**
* An abstract DatanodeDescriptor to track datanodes with provided storages.
* NOTE: never resolved through registerDatanode, so not in the topology.
*/
@@ -336,6 +300,7 @@ public class ProvidedStorageMap {
DatanodeStorageInfo getProvidedStorage(
DatanodeDescriptor dn, DatanodeStorage s) {
+ LOG.info("XXXXX adding Datanode " + dn.getDatanodeUuid());
dns.put(dn.getDatanodeUuid(), dn);
// TODO: maintain separate RPC ident per dn
return storageMap.get(s.getStorageID());
@@ -352,7 +317,7 @@ public class ProvidedStorageMap {
DatanodeDescriptor choose(DatanodeDescriptor client) {
// exact match for now
DatanodeDescriptor dn = client != null ?
- dns.get(client.getDatanodeUuid()) : null;
+ dns.get(client.getDatanodeUuid()) : null;
if (null == dn) {
dn = chooseRandom();
}
@@ -360,10 +325,10 @@ public class ProvidedStorageMap {
}
DatanodeDescriptor choose(DatanodeDescriptor client,
- List<String> excludedUUids) {
+ Set<String> excludedUUids) {
// exact match for now
DatanodeDescriptor dn = client != null ?
- dns.get(client.getDatanodeUuid()) : null;
+ dns.get(client.getDatanodeUuid()) : null;
if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) {
dn = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d3be87e/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/FixedBlockResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/FixedBlockResolver.java b/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/FixedBlockResolver.java
index 8ff9695..4b3a01f 100644
--- a/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/FixedBlockResolver.java
+++ b/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/FixedBlockResolver.java
@@ -34,6 +34,7 @@ public class FixedBlockResolver extends BlockResolver implements Configurable {
"hdfs.image.writer.resolver.fixed.block.size";
public static final String START_BLOCK =
"hdfs.image.writer.resolver.fixed.block.start";
+ public static final long BLOCKSIZE_DEFAULT = 256 * (1L << 20);
private Configuration conf;
private long blocksize = 256 * (1L << 20);
@@ -42,7 +43,7 @@ public class FixedBlockResolver extends BlockResolver implements Configurable {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
- blocksize = conf.getLong(BLOCKSIZE, 256 * (1L << 20));
+ blocksize = conf.getLong(BLOCKSIZE, BLOCKSIZE_DEFAULT);
blockIds.set(conf.getLong(START_BLOCK, (1L << 30)));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d3be87e/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
index f6d38f6..9c82967 100644
--- a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
+++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
@@ -474,12 +474,12 @@ public class TestNameNodeProvidedImplementation {
}
private DatanodeInfo[] getAndCheckBlockLocations(DFSClient client,
- String filename, int expectedLocations) throws IOException {
- LocatedBlocks locatedBlocks = client.getLocatedBlocks(
- filename, 0, baseFileLen);
- //given the start and length in the above call,
- //only one LocatedBlock in LocatedBlocks
- assertEquals(1, locatedBlocks.getLocatedBlocks().size());
+ String filename, long fileLen, long expectedBlocks, int expectedLocations)
+ throws IOException {
+ LocatedBlocks locatedBlocks = client.getLocatedBlocks(filename, 0, fileLen);
+ // given the start and length in the above call,
+ // only one LocatedBlock in LocatedBlocks
+ assertEquals(expectedBlocks, locatedBlocks.getLocatedBlocks().size());
LocatedBlock locatedBlock = locatedBlocks.getLocatedBlocks().get(0);
assertEquals(expectedLocations, locatedBlock.getLocations().length);
return locatedBlock.getLocations();
@@ -513,17 +513,20 @@ public class TestNameNodeProvidedImplementation {
file, newReplication, 10000);
DFSClient client = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), cluster.getConfiguration(0));
- getAndCheckBlockLocations(client, filename, newReplication);
+ getAndCheckBlockLocations(client, filename, baseFileLen, 1, newReplication);
// set the replication back to 1
newReplication = 1;
LOG.info("Setting replication of file {} back to {}",
filename, newReplication);
fs.setReplication(file, newReplication);
+ // defaultReplication number of replicas should be returned
+ int defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT);
DFSTestUtil.waitForReplication((DistributedFileSystem) fs,
- file, newReplication, 10000);
- // the only replica left should be the PROVIDED datanode
- getAndCheckBlockLocations(client, filename, newReplication);
+ file, (short) defaultReplication, 10000);
+ getAndCheckBlockLocations(client, filename, baseFileLen, 1,
+ defaultReplication);
}
@Test(timeout=30000)
@@ -545,8 +548,9 @@ public class TestNameNodeProvidedImplementation {
if (numFiles >= 1) {
String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix;
-
- DatanodeInfo[] dnInfos = getAndCheckBlockLocations(client, filename, 1);
+ // 2 locations returned as there are 2 PROVIDED datanodes
+ DatanodeInfo[] dnInfos =
+ getAndCheckBlockLocations(client, filename, baseFileLen, 1, 2);
//the location should be one of the provided DNs available
assertTrue(
dnInfos[0].getDatanodeUuid().equals(
@@ -564,7 +568,7 @@ public class TestNameNodeProvidedImplementation {
providedDatanode1.getDatanodeId().getXferAddr());
//should find the block on the 2nd provided datanode
- dnInfos = getAndCheckBlockLocations(client, filename, 1);
+ dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1);
assertEquals(providedDatanode2.getDatanodeUuid(),
dnInfos[0].getDatanodeUuid());
@@ -575,14 +579,14 @@ public class TestNameNodeProvidedImplementation {
BlockManagerTestUtil.noticeDeadDatanode(
cluster.getNameNode(),
providedDatanode2.getDatanodeId().getXferAddr());
- getAndCheckBlockLocations(client, filename, 0);
+ getAndCheckBlockLocations(client, filename, baseFileLen, 1, 0);
//restart the provided datanode
cluster.restartDataNode(providedDNProperties1, true);
cluster.waitActive();
//should find the block on the 1st provided datanode now
- dnInfos = getAndCheckBlockLocations(client, filename, 1);
+ dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1);
//not comparing UUIDs as the datanode can now have a different one.
assertEquals(providedDatanode1.getDatanodeId().getXferAddr(),
dnInfos[0].getXferAddr());
@@ -593,20 +597,18 @@ public class TestNameNodeProvidedImplementation {
public void testTransientDeadDatanodes() throws Exception {
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class);
- // 2 Datanodes, 1 PROVIDED and other DISK
- startCluster(NNDIRPATH, 2, null,
+ // 3 Datanodes, 2 PROVIDED and other DISK
+ startCluster(NNDIRPATH, 3, null,
new StorageType[][] {
{StorageType.PROVIDED, StorageType.DISK},
+ {StorageType.PROVIDED, StorageType.DISK},
{StorageType.DISK}},
false);
DataNode providedDatanode = cluster.getDataNodes().get(0);
-
- DFSClient client = new DFSClient(new InetSocketAddress("localhost",
- cluster.getNameNodePort()), cluster.getConfiguration(0));
-
for (int i= 0; i < numFiles; i++) {
- verifyFileLocation(i);
+ // expect to have 2 locations as we have 2 provided Datanodes.
+ verifyFileLocation(i, 2);
// NameNode thinks the datanode is down
BlockManagerTestUtil.noticeDeadDatanode(
cluster.getNameNode(),
@@ -614,7 +616,7 @@ public class TestNameNodeProvidedImplementation {
cluster.waitActive();
cluster.triggerHeartbeats();
Thread.sleep(1000);
- verifyFileLocation(i);
+ verifyFileLocation(i, 2);
}
}
@@ -622,17 +624,18 @@ public class TestNameNodeProvidedImplementation {
public void testNamenodeRestart() throws Exception {
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class);
- // 2 Datanodes, 1 PROVIDED and other DISK
- startCluster(NNDIRPATH, 2, null,
+ // 3 Datanodes, 2 PROVIDED and other DISK
+ startCluster(NNDIRPATH, 3, null,
new StorageType[][] {
{StorageType.PROVIDED, StorageType.DISK},
+ {StorageType.PROVIDED, StorageType.DISK},
{StorageType.DISK}},
false);
- verifyFileLocation(numFiles - 1);
+ verifyFileLocation(numFiles - 1, 2);
cluster.restartNameNodes();
cluster.waitActive();
- verifyFileLocation(numFiles - 1);
+ verifyFileLocation(numFiles - 1, 2);
}
/**
@@ -640,18 +643,21 @@ public class TestNameNodeProvidedImplementation {
* @param fileIndex the index of the file to verify.
* @throws Exception
*/
- private void verifyFileLocation(int fileIndex)
+ private void verifyFileLocation(int fileIndex, int replication)
throws Exception {
- DataNode providedDatanode = cluster.getDataNodes().get(0);
DFSClient client = new DFSClient(
new InetSocketAddress("localhost", cluster.getNameNodePort()),
cluster.getConfiguration(0));
- if (fileIndex <= numFiles && fileIndex >= 0) {
- String filename = "/" + filePrefix + fileIndex + fileSuffix;
- DatanodeInfo[] dnInfos = getAndCheckBlockLocations(client, filename, 1);
- // location should be the provided DN
- assertEquals(providedDatanode.getDatanodeUuid(),
- dnInfos[0].getDatanodeUuid());
+ if (fileIndex < numFiles && fileIndex >= 0) {
+ String filename = filePrefix + fileIndex + fileSuffix;
+ File file = new File(new Path(NAMEPATH, filename).toUri());
+ long fileLen = file.length();
+ long blockSize = conf.getLong(FixedBlockResolver.BLOCKSIZE,
+ FixedBlockResolver.BLOCKSIZE_DEFAULT);
+ long numLocatedBlocks =
+ fileLen == 0 ? 1 : (long) Math.ceil(fileLen * 1.0 / blockSize);
+ getAndCheckBlockLocations(client, "/" + filename, fileLen,
+ numLocatedBlocks, replication);
}
}
@@ -669,4 +675,55 @@ public class TestNameNodeProvidedImplementation {
NameNode nn = cluster.getNameNode();
assertEquals(clusterID, nn.getNamesystem().getClusterId());
}
+
+ @Test(timeout=30000)
+ public void testNumberOfProvidedLocations() throws Exception {
+ // set default replication to 4
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4);
+ createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
+ FixedBlockResolver.class);
+ // start with 4 PROVIDED location
+ startCluster(NNDIRPATH, 4,
+ new StorageType[]{
+ StorageType.PROVIDED, StorageType.DISK},
+ null,
+ false);
+ int expectedLocations = 4;
+ for (int i = 0; i < numFiles; i++) {
+ verifyFileLocation(i, expectedLocations);
+ }
+ // stop 2 datanodes, one after the other and verify number of locations.
+ for (int i = 1; i <= 2; i++) {
+ DataNode dn = cluster.getDataNodes().get(0);
+ cluster.stopDataNode(0);
+ // make NameNode detect that datanode is down
+ BlockManagerTestUtil.noticeDeadDatanode(cluster.getNameNode(),
+ dn.getDatanodeId().getXferAddr());
+
+ expectedLocations = 4 - i;
+ for (int j = 0; j < numFiles; j++) {
+ verifyFileLocation(j, expectedLocations);
+ }
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testNumberOfProvidedLocationsManyBlocks() throws Exception {
+ // increase number of blocks per file to at least 10 blocks per file
+ conf.setLong(FixedBlockResolver.BLOCKSIZE, baseFileLen/10);
+ // set default replication to 4
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4);
+ createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
+ FixedBlockResolver.class);
+ // start with 4 PROVIDED location
+ startCluster(NNDIRPATH, 4,
+ new StorageType[]{
+ StorageType.PROVIDED, StorageType.DISK},
+ null,
+ false);
+ int expectedLocations = 4;
+ for (int i = 0; i < numFiles; i++) {
+ verifyFileLocation(i, expectedLocations);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org