You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2017/12/15 18:39:29 UTC
[43/50] [abbrv] hadoop git commit: HDFS-12893. [READ] Support
replication of Provided blocks with non-default topologies.
HDFS-12893. [READ] Support replication of Provided blocks with non-default topologies.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e075a61f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e075a61f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e075a61f
Branch: refs/heads/HDFS-9806
Commit: e075a61f10feea387071ffeeadf9d4666935b2b0
Parents: 298fda2
Author: Virajith Jalaparti <vi...@apache.org>
Authored: Fri Dec 8 14:52:48 2017 -0800
Committer: Virajith Jalaparti <vi...@apache.org>
Committed: Fri Dec 15 10:25:35 2017 -0800
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 30 +++++++++++-
.../blockmanagement/DatanodeStorageInfo.java | 11 +++--
.../blockmanagement/ProvidedStorageMap.java | 18 ++++++-
.../TestNameNodeProvidedImplementation.java | 49 ++++++++++++++++++--
4 files changed, 97 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e075a61f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 916cbaa..c1cd4db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2151,6 +2151,22 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
+ * Get the associated {@link DatanodeDescriptor} for the storage.
+ * If the storage is of type PROVIDED, one of the nodes that reported
+ * PROVIDED storage are returned. If not, this is equivalent to
+ * {@code storage.getDatanodeDescriptor()}.
+ * @param storage
+ * @return the associated {@link DatanodeDescriptor}.
+ */
+ private DatanodeDescriptor getDatanodeDescriptorFromStorage(
+ DatanodeStorageInfo storage) {
+ if (storage.getStorageType() == StorageType.PROVIDED) {
+ return providedStorageMap.chooseProvidedDatanode();
+ }
+ return storage.getDatanodeDescriptor();
+ }
+
+ /**
* Parse the data-nodes the block belongs to and choose a certain number
* from them to be the recovery sources.
*
@@ -2198,10 +2214,14 @@ public class BlockManager implements BlockStatsMXBean {
BitSet bitSet = isStriped ?
new BitSet(((BlockInfoStriped) block).getTotalBlockNum()) : null;
for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
- final DatanodeDescriptor node = storage.getDatanodeDescriptor();
+ final DatanodeDescriptor node = getDatanodeDescriptorFromStorage(storage);
final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block,
storage, corruptReplicas.getNodes(block), false);
if (state == StoredReplicaState.LIVE) {
+ if (storage.getStorageType() == StorageType.PROVIDED) {
+ storage = new DatanodeStorageInfo(node, storage.getStorageID(),
+ storage.getStorageType(), storage.getState());
+ }
nodesContainingLiveReplicas.add(storage);
}
containingNodes.add(node);
@@ -4338,7 +4358,13 @@ public class BlockManager implements BlockStatsMXBean {
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(storedBlock);
for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
- final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
+ if (storage.getStorageType() == StorageType.PROVIDED
+ && storage.getState() == State.NORMAL) {
+ // assume the policy is satisfied for blocks on PROVIDED storage
+ // as long as the storage is in normal state.
+ return true;
+ }
+ final DatanodeDescriptor cur = getDatanodeDescriptorFromStorage(storage);
// Nodes under maintenance should be counted as valid replicas from
// rack policy point of view.
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e075a61f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 76bf915..3a56ef1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -120,10 +120,15 @@ public class DatanodeStorageInfo {
private boolean blockContentsStale = true;
DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) {
+ this(dn, s.getStorageID(), s.getStorageType(), s.getState());
+ }
+
+ DatanodeStorageInfo(DatanodeDescriptor dn, String storageID,
+ StorageType storageType, State state) {
this.dn = dn;
- this.storageID = s.getStorageID();
- this.storageType = s.getStorageType();
- this.state = s.getState();
+ this.storageID = storageID;
+ this.storageType = storageType;
+ this.state = state;
}
public int getBlockReportCount() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e075a61f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
index 208ed3e..08d1434 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
@@ -192,7 +192,7 @@ public class ProvidedStorageMap {
}
public void updateStorage(DatanodeDescriptor node, DatanodeStorage storage) {
- if (providedEnabled && storageId.equals(storage.getStorageID())) {
+ if (isProvidedStorage(storage.getStorageID())) {
if (StorageType.PROVIDED.equals(storage.getStorageType())) {
node.injectStorage(providedStorageInfo);
return;
@@ -204,6 +204,22 @@ public class ProvidedStorageMap {
node.updateStorage(storage);
}
+ private boolean isProvidedStorage(String dnStorageId) {
+ return providedEnabled && storageId.equals(dnStorageId);
+ }
+
+ /**
+ * Choose a datanode that reported a volume of {@link StorageType} PROVIDED.
+ *
+ * @return the {@link DatanodeDescriptor} corresponding to a datanode that
+ * reported a volume with {@link StorageType} PROVIDED. If multiple
+ * datanodes report a PROVIDED volume, one is chosen uniformly at
+ * random.
+ */
+ public DatanodeDescriptor chooseProvidedDatanode() {
+ return providedDescriptor.chooseRandom();
+ }
+
/**
* Builder used for creating {@link LocatedBlocks} when a block is provided.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e075a61f/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
index 394e8d8..2917a34 100644
--- a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
+++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAl
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -201,8 +202,15 @@ public class TestNameNodeProvidedImplementation {
void startCluster(Path nspath, int numDatanodes,
StorageType[] storageTypes,
StorageType[][] storageTypesPerDatanode,
- boolean doFormat)
- throws IOException {
+ boolean doFormat) throws IOException {
+ startCluster(nspath, numDatanodes, storageTypes, storageTypesPerDatanode,
+ doFormat, null);
+ }
+
+ void startCluster(Path nspath, int numDatanodes,
+ StorageType[] storageTypes,
+ StorageType[][] storageTypesPerDatanode,
+ boolean doFormat, String[] racks) throws IOException {
conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString());
if (storageTypesPerDatanode != null) {
@@ -211,6 +219,7 @@ public class TestNameNodeProvidedImplementation {
.manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes)
.storageTypes(storageTypesPerDatanode)
+ .racks(racks)
.build();
} else if (storageTypes != null) {
cluster = new MiniDFSCluster.Builder(conf)
@@ -219,12 +228,14 @@ public class TestNameNodeProvidedImplementation {
.numDataNodes(numDatanodes)
.storagesPerDatanode(storageTypes.length)
.storageTypes(storageTypes)
+ .racks(racks)
.build();
} else {
cluster = new MiniDFSCluster.Builder(conf)
.format(doFormat)
.manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes)
+ .racks(racks)
.build();
}
cluster.waitActive();
@@ -515,11 +526,12 @@ public class TestNameNodeProvidedImplementation {
StorageType.PROVIDED, StorageType.DISK},
null,
false);
+ setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix);
+ }
- String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix;
+ private void setAndUnsetReplication(String filename) throws Exception {
Path file = new Path(filename);
FileSystem fs = cluster.getFileSystem();
-
// set the replication to 4, and test that the file has
// the required replication.
short newReplication = 4;
@@ -833,7 +845,7 @@ public class TestNameNodeProvidedImplementation {
new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
null, false);
- int fileIndex = numFiles -1;
+ int fileIndex = numFiles - 1;
final BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final DatanodeManager dnm = blockManager.getDatanodeManager();
@@ -890,4 +902,31 @@ public class TestNameNodeProvidedImplementation {
// reports all 3 replicas
verifyFileLocation(fileIndex, 3);
}
+
+ @Test
+ public void testProvidedWithHierarchicalTopology() throws Exception {
+ conf.setClass(ImageWriter.Options.UGI_CLASS, FsUGIResolver.class,
+ UGIResolver.class);
+ String packageName = "org.apache.hadoop.hdfs.server.blockmanagement";
+ String[] policies = new String[] {
+ "BlockPlacementPolicyDefault",
+ "BlockPlacementPolicyRackFaultTolerant",
+ "BlockPlacementPolicyWithNodeGroup",
+ "BlockPlacementPolicyWithUpgradeDomain"};
+ createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
+ FixedBlockResolver.class);
+ String[] racks =
+ {"/pod0/rack0", "/pod0/rack0", "/pod0/rack1", "/pod0/rack1",
+ "/pod1/rack0", "/pod1/rack0", "/pod1/rack1", "/pod1/rack1" };
+ for (String policy: policies) {
+ LOG.info("Using policy: " + packageName + "." + policy);
+ conf.set(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, packageName + "." + policy);
+ startCluster(NNDIRPATH, racks.length,
+ new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
+ null, false, racks);
+ verifyFileSystemContents();
+ setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix);
+ cluster.shutdown();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org