You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by kk...@apache.org on 2017/12/19 00:09:26 UTC

[35/50] [abbrv] hadoop git commit: HDFS-12893. [READ] Support replication of Provided blocks with non-default topologies.

HDFS-12893. [READ] Support replication of Provided blocks with non-default topologies.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c89b29bd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c89b29bd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c89b29bd

Branch: refs/heads/YARN-6592
Commit: c89b29bd421152f0e7e16936f18d9e852895c37a
Parents: 0f6aa95
Author: Virajith Jalaparti <vi...@apache.org>
Authored: Fri Dec 8 14:52:48 2017 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Dec 15 17:51:41 2017 -0800

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    | 30 +++++++++++-
 .../blockmanagement/DatanodeStorageInfo.java    | 11 +++--
 .../blockmanagement/ProvidedStorageMap.java     | 18 ++++++-
 .../TestNameNodeProvidedImplementation.java     | 49 ++++++++++++++++++--
 4 files changed, 97 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c89b29bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 916cbaa..c1cd4db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2151,6 +2151,22 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
+   * Get the associated {@link DatanodeDescriptor} for the storage.
+   * If the storage is of type PROVIDED, one of the nodes that reported
+   * PROVIDED storage are returned. If not, this is equivalent to
+   * {@code storage.getDatanodeDescriptor()}.
+   * @param storage
+   * @return the associated {@link DatanodeDescriptor}.
+   */
+  private DatanodeDescriptor getDatanodeDescriptorFromStorage(
+      DatanodeStorageInfo storage) {
+    if (storage.getStorageType() == StorageType.PROVIDED) {
+      return providedStorageMap.chooseProvidedDatanode();
+    }
+    return storage.getDatanodeDescriptor();
+  }
+
+  /**
    * Parse the data-nodes the block belongs to and choose a certain number
    * from them to be the recovery sources.
    *
@@ -2198,10 +2214,14 @@ public class BlockManager implements BlockStatsMXBean {
     BitSet bitSet = isStriped ?
         new BitSet(((BlockInfoStriped) block).getTotalBlockNum()) : null;
     for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
-      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
+      final DatanodeDescriptor node = getDatanodeDescriptorFromStorage(storage);
       final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block,
           storage, corruptReplicas.getNodes(block), false);
       if (state == StoredReplicaState.LIVE) {
+        if (storage.getStorageType() == StorageType.PROVIDED) {
+          storage = new DatanodeStorageInfo(node, storage.getStorageID(),
+              storage.getStorageType(), storage.getState());
+        }
         nodesContainingLiveReplicas.add(storage);
       }
       containingNodes.add(node);
@@ -4338,7 +4358,13 @@ public class BlockManager implements BlockStatsMXBean {
     Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
         .getNodes(storedBlock);
     for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
-      final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
+      if (storage.getStorageType() == StorageType.PROVIDED
+          && storage.getState() == State.NORMAL) {
+        // assume the policy is satisfied for blocks on PROVIDED storage
+        // as long as the storage is in normal state.
+        return true;
+      }
+      final DatanodeDescriptor cur = getDatanodeDescriptorFromStorage(storage);
       // Nodes under maintenance should be counted as valid replicas from
       // rack policy point of view.
       if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c89b29bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 76bf915..3a56ef1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -120,10 +120,15 @@ public class DatanodeStorageInfo {
   private boolean blockContentsStale = true;
 
   DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) {
+    this(dn, s.getStorageID(), s.getStorageType(), s.getState());
+  }
+
+  DatanodeStorageInfo(DatanodeDescriptor dn, String storageID,
+      StorageType storageType, State state) {
     this.dn = dn;
-    this.storageID = s.getStorageID();
-    this.storageType = s.getStorageType();
-    this.state = s.getState();
+    this.storageID = storageID;
+    this.storageType = storageType;
+    this.state = state;
   }
 
   public int getBlockReportCount() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c89b29bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
index 208ed3e..08d1434 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
@@ -192,7 +192,7 @@ public class ProvidedStorageMap {
   }
 
   public void updateStorage(DatanodeDescriptor node, DatanodeStorage storage) {
-    if (providedEnabled && storageId.equals(storage.getStorageID())) {
+    if (isProvidedStorage(storage.getStorageID())) {
       if (StorageType.PROVIDED.equals(storage.getStorageType())) {
         node.injectStorage(providedStorageInfo);
         return;
@@ -204,6 +204,22 @@ public class ProvidedStorageMap {
     node.updateStorage(storage);
   }
 
+  private boolean isProvidedStorage(String dnStorageId) {
+    return providedEnabled && storageId.equals(dnStorageId);
+  }
+
+  /**
+   * Choose a datanode that reported a volume of {@link StorageType} PROVIDED.
+   *
+   * @return the {@link DatanodeDescriptor} corresponding to a datanode that
+   *         reported a volume with {@link StorageType} PROVIDED. If multiple
+   *         datanodes report a PROVIDED volume, one is chosen uniformly at
+   *         random.
+   */
+  public DatanodeDescriptor chooseProvidedDatanode() {
+    return providedDescriptor.chooseRandom();
+  }
+
   /**
    * Builder used for creating {@link LocatedBlocks} when a block is provided.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c89b29bd/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
index 394e8d8..2917a34 100644
--- a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
+++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAl
 import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
 
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -201,8 +202,15 @@ public class TestNameNodeProvidedImplementation {
   void startCluster(Path nspath, int numDatanodes,
       StorageType[] storageTypes,
       StorageType[][] storageTypesPerDatanode,
-      boolean doFormat)
-      throws IOException {
+      boolean doFormat) throws IOException {
+    startCluster(nspath, numDatanodes, storageTypes, storageTypesPerDatanode,
+        doFormat, null);
+  }
+
+  void startCluster(Path nspath, int numDatanodes,
+      StorageType[] storageTypes,
+      StorageType[][] storageTypesPerDatanode,
+      boolean doFormat, String[] racks) throws IOException {
     conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString());
 
     if (storageTypesPerDatanode != null) {
@@ -211,6 +219,7 @@ public class TestNameNodeProvidedImplementation {
           .manageNameDfsDirs(doFormat)
           .numDataNodes(numDatanodes)
           .storageTypes(storageTypesPerDatanode)
+          .racks(racks)
           .build();
     } else if (storageTypes != null) {
       cluster = new MiniDFSCluster.Builder(conf)
@@ -219,12 +228,14 @@ public class TestNameNodeProvidedImplementation {
           .numDataNodes(numDatanodes)
           .storagesPerDatanode(storageTypes.length)
           .storageTypes(storageTypes)
+          .racks(racks)
           .build();
     } else {
       cluster = new MiniDFSCluster.Builder(conf)
           .format(doFormat)
           .manageNameDfsDirs(doFormat)
           .numDataNodes(numDatanodes)
+          .racks(racks)
           .build();
     }
     cluster.waitActive();
@@ -515,11 +526,12 @@ public class TestNameNodeProvidedImplementation {
             StorageType.PROVIDED, StorageType.DISK},
         null,
         false);
+    setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix);
+  }
 
-    String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix;
+  private void setAndUnsetReplication(String filename) throws Exception {
     Path file = new Path(filename);
     FileSystem fs = cluster.getFileSystem();
-
     // set the replication to 4, and test that the file has
     // the required replication.
     short newReplication = 4;
@@ -833,7 +845,7 @@ public class TestNameNodeProvidedImplementation {
         new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
         null, false);
 
-    int fileIndex = numFiles -1;
+    int fileIndex = numFiles - 1;
 
     final BlockManager blockManager = cluster.getNamesystem().getBlockManager();
     final DatanodeManager dnm = blockManager.getDatanodeManager();
@@ -890,4 +902,31 @@ public class TestNameNodeProvidedImplementation {
     // reports all 3 replicas
     verifyFileLocation(fileIndex, 3);
   }
+
+  @Test
+  public void testProvidedWithHierarchicalTopology() throws Exception {
+    conf.setClass(ImageWriter.Options.UGI_CLASS, FsUGIResolver.class,
+        UGIResolver.class);
+    String packageName = "org.apache.hadoop.hdfs.server.blockmanagement";
+    String[] policies = new String[] {
+        "BlockPlacementPolicyDefault",
+        "BlockPlacementPolicyRackFaultTolerant",
+        "BlockPlacementPolicyWithNodeGroup",
+        "BlockPlacementPolicyWithUpgradeDomain"};
+    createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
+        FixedBlockResolver.class);
+    String[] racks =
+        {"/pod0/rack0", "/pod0/rack0", "/pod0/rack1", "/pod0/rack1",
+            "/pod1/rack0", "/pod1/rack0", "/pod1/rack1", "/pod1/rack1" };
+    for (String policy: policies) {
+      LOG.info("Using policy: " + packageName + "." + policy);
+      conf.set(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, packageName + "." + policy);
+      startCluster(NNDIRPATH, racks.length,
+          new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
+          null, false, racks);
+      verifyFileSystemContents();
+      setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix);
+      cluster.shutdown();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org