You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/01/29 04:12:11 UTC

[06/50] [abbrv] hadoop git commit: HDFS-11309. [SPS]: chooseTargetTypeInSameNode should pass accurate block size to chooseStorage4Block while choosing target. Contributed by Uma Maheswara Rao G

HDFS-11309. [SPS]: chooseTargetTypeInSameNode should pass accurate block size to chooseStorage4Block while choosing target. Contributed by Uma Maheswara Rao G


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f12aee0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f12aee0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f12aee0

Branch: refs/heads/HDFS-10285
Commit: 7f12aee0e24966a8464b1f4dbb8e8ff6db067b05
Parents: 09e5176
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Fri Jan 20 21:37:51 2017 +0530
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Mon Jan 29 09:17:44 2018 +0530

----------------------------------------------------------------------
 .../server/namenode/StoragePolicySatisfier.java |  31 +++---
 .../namenode/TestStoragePolicySatisfier.java    | 108 ++++++++++++++++---
 2 files changed, 110 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f12aee0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 3b19833..1c48910 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -403,24 +403,25 @@ public class StoragePolicySatisfier implements Runnable {
     List<StorageType> sourceStorageTypes = new ArrayList<>();
     List<DatanodeInfo> targetNodes = new ArrayList<>();
     List<StorageType> targetStorageTypes = new ArrayList<>();
-    List<DatanodeDescriptor> chosenNodes = new ArrayList<>();
+    List<DatanodeDescriptor> excludeNodes = new ArrayList<>();
 
     // Looping over all the source node locations and choose the target
     // storage within same node if possible. This is done separately to
     // avoid choosing a target which already has this block.
     for (int i = 0; i < sourceWithStorageList.size(); i++) {
       StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
-      StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
+      StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(blockInfo,
           existingTypeNodePair.dn, expected);
       if (chosenTarget != null) {
         sourceNodes.add(existingTypeNodePair.dn);
         sourceStorageTypes.add(existingTypeNodePair.storageType);
         targetNodes.add(chosenTarget.dn);
         targetStorageTypes.add(chosenTarget.storageType);
-        chosenNodes.add(chosenTarget.dn);
         expected.remove(chosenTarget.storageType);
         // TODO: We can increment scheduled block count for this node?
       }
+      // To avoid choosing this excludeNodes as targets later
+      excludeNodes.add(existingTypeNodePair.dn);
     }
 
     // Looping over all the source node locations. Choose a remote target
@@ -437,28 +438,28 @@ public class StoragePolicySatisfier implements Runnable {
           .getNetworkTopology().isNodeGroupAware()) {
         chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
             expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes,
-            chosenNodes);
+            excludeNodes);
       }
 
       // Then, match nodes on the same rack
       if (chosenTarget == null) {
         chosenTarget =
             chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
-                Matcher.SAME_RACK, locsForExpectedStorageTypes, chosenNodes);
+                Matcher.SAME_RACK, locsForExpectedStorageTypes, excludeNodes);
       }
 
       if (chosenTarget == null) {
         chosenTarget =
             chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
-                Matcher.ANY_OTHER, locsForExpectedStorageTypes, chosenNodes);
+                Matcher.ANY_OTHER, locsForExpectedStorageTypes, excludeNodes);
       }
       if (null != chosenTarget) {
         sourceNodes.add(existingTypeNodePair.dn);
         sourceStorageTypes.add(existingTypeNodePair.storageType);
         targetNodes.add(chosenTarget.dn);
         targetStorageTypes.add(chosenTarget.storageType);
-        chosenNodes.add(chosenTarget.dn);
         expected.remove(chosenTarget.storageType);
+        excludeNodes.add(chosenTarget.dn);
         // TODO: We can increment scheduled block count for this node?
       } else {
         LOG.warn(
@@ -554,14 +555,18 @@ public class StoragePolicySatisfier implements Runnable {
   /**
    * Choose the target storage within same datanode if possible.
    *
-   * @param source source datanode
-   * @param targetTypes list of target storage types
+   * @param block
+   *          - block info
+   * @param source
+   *          - source datanode
+   * @param targetTypes
+   *          - list of target storage types
    */
-  private StorageTypeNodePair chooseTargetTypeInSameNode(
+  private StorageTypeNodePair chooseTargetTypeInSameNode(Block block,
       DatanodeDescriptor source, List<StorageType> targetTypes) {
     for (StorageType t : targetTypes) {
       DatanodeStorageInfo chooseStorage4Block =
-          source.chooseStorage4Block(t, 0);
+          source.chooseStorage4Block(t, block.getNumBytes());
       if (chooseStorage4Block != null) {
         return new StorageTypeNodePair(t, source);
       }
@@ -572,7 +577,7 @@ public class StoragePolicySatisfier implements Runnable {
   private StorageTypeNodePair chooseTarget(Block block,
       DatanodeDescriptor source, List<StorageType> targetTypes, Matcher matcher,
       StorageTypeNodeMap locsForExpectedStorageTypes,
-      List<DatanodeDescriptor> chosenNodes) {
+      List<DatanodeDescriptor> excludeNodes) {
     for (StorageType t : targetTypes) {
       List<DatanodeDescriptor> nodesWithStorages =
           locsForExpectedStorageTypes.getNodesWithStorages(t);
@@ -581,7 +586,7 @@ public class StoragePolicySatisfier implements Runnable {
       }
       Collections.shuffle(nodesWithStorages);
       for (DatanodeDescriptor target : nodesWithStorages) {
-        if (!chosenNodes.contains(target) && matcher.match(
+        if (!excludeNodes.contains(target) && matcher.match(
             blockManager.getDatanodeManager().getNetworkTopology(), source,
             target)) {
           if (null != target.chooseStorage4Block(t, block.getNumBytes())) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f12aee0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 1c53894..de73e8b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -18,12 +18,14 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
+import static org.junit.Assert.assertNull;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@@ -57,6 +60,8 @@ import com.google.common.base.Supplier;
  * moved and finding its suggested target locations to move.
  */
 public class TestStoragePolicySatisfier {
+  private static final String ONE_SSD = "ONE_SSD";
+  private static final String COLD = "COLD";
   private static final Logger LOG =
       LoggerFactory.getLogger(TestStoragePolicySatisfier.class);
   private final Configuration config = new HdfsConfiguration();
@@ -93,7 +98,7 @@ public class TestStoragePolicySatisfier {
     try {
       createCluster();
       // Change policy to COLD
-      dfs.setStoragePolicy(new Path(file), "COLD");
+      dfs.setStoragePolicy(new Path(file), COLD);
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
       INode inode = namesystem.getFSDirectory().getINode(file);
 
@@ -151,7 +156,7 @@ public class TestStoragePolicySatisfier {
     try {
       createCluster();
       // Change policy to ONE_SSD
-      dfs.setStoragePolicy(new Path(file), "ONE_SSD");
+      dfs.setStoragePolicy(new Path(file), ONE_SSD);
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
       INode inode = namesystem.getFSDirectory().getINode(file);
 
@@ -184,7 +189,7 @@ public class TestStoragePolicySatisfier {
     try {
       createCluster();
       // Change policy to ONE_SSD
-      dfs.setStoragePolicy(new Path(file), "ONE_SSD");
+      dfs.setStoragePolicy(new Path(file), ONE_SSD);
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
       INode inode = namesystem.getFSDirectory().getINode(file);
 
@@ -232,7 +237,7 @@ public class TestStoragePolicySatisfier {
       List<Long> blockCollectionIds = new ArrayList<>();
       // Change policy to ONE_SSD
       for (String fileName : files) {
-        dfs.setStoragePolicy(new Path(fileName), "ONE_SSD");
+        dfs.setStoragePolicy(new Path(fileName), ONE_SSD);
         INode inode = namesystem.getFSDirectory().getINode(fileName);
         blockCollectionIds.add(inode.getId());
       }
@@ -274,12 +279,12 @@ public class TestStoragePolicySatisfier {
       HdfsAdmin hdfsAdmin =
           new HdfsAdmin(FileSystem.getDefaultUri(config), config);
       // Change policy to COLD
-      dfs.setStoragePolicy(new Path(file), "COLD");
+      dfs.setStoragePolicy(new Path(file), COLD);
 
       StorageType[][] newtypes =
-          new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
-              {StorageType.ARCHIVE, StorageType.ARCHIVE},
-              {StorageType.ARCHIVE, StorageType.ARCHIVE}};
+          new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
+              {StorageType.DISK, StorageType.ARCHIVE},
+              {StorageType.DISK, StorageType.ARCHIVE}};
       startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
           storagesPerDatanode, capacity, hdfsCluster);
 
@@ -314,7 +319,7 @@ public class TestStoragePolicySatisfier {
       writeContent(subFile2);
 
       // Change policy to COLD
-      dfs.setStoragePolicy(new Path(subDir), "ONE_SSD");
+      dfs.setStoragePolicy(new Path(subDir), ONE_SSD);
 
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
@@ -418,7 +423,7 @@ public class TestStoragePolicySatisfier {
     try {
       createCluster();
       // Change policy to COLD
-      dfs.setStoragePolicy(new Path(file), "COLD");
+      dfs.setStoragePolicy(new Path(file), COLD);
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
       INode inode = namesystem.getFSDirectory().getINode(file);
 
@@ -463,7 +468,7 @@ public class TestStoragePolicySatisfier {
     try {
       createCluster();
       // Change policy to COLD
-      dfs.setStoragePolicy(new Path(file), "COLD");
+      dfs.setStoragePolicy(new Path(file), COLD);
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
       INode inode = namesystem.getFSDirectory().getINode(file);
 
@@ -533,7 +538,7 @@ public class TestStoragePolicySatisfier {
     final String file1 = createFileAndSimulateFavoredNodes(2);
 
     // Change policy to COLD
-    dfs.setStoragePolicy(new Path(file1), "COLD");
+    dfs.setStoragePolicy(new Path(file1), COLD);
     FSNamesystem namesystem = hdfsCluster.getNamesystem();
     INode inode = namesystem.getFSDirectory().getINode(file1);
 
@@ -594,7 +599,7 @@ public class TestStoragePolicySatisfier {
       writeContent(file, (short) 5);
 
       // Change policy to COLD
-      dfs.setStoragePolicy(new Path(file), "COLD");
+      dfs.setStoragePolicy(new Path(file), COLD);
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
       INode inode = namesystem.getFSDirectory().getINode(file);
 
@@ -633,7 +638,7 @@ public class TestStoragePolicySatisfier {
       writeContent(file);
 
       // Change policy to ONE_SSD
-      dfs.setStoragePolicy(new Path(file), "ONE_SSD");
+      dfs.setStoragePolicy(new Path(file), ONE_SSD);
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
       INode inode = namesystem.getFSDirectory().getINode(file);
 
@@ -688,6 +693,77 @@ public class TestStoragePolicySatisfier {
     }
   }
 
+  /**
+   * Tests that movements should not be assigned when there is no space in
+   * target DN.
+   */
+  @Test(timeout = 300000)
+  public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace()
+      throws Exception {
+    StorageType[][] diskTypes =
+        new StorageType[][]{{StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.SSD},
+            {StorageType.DISK, StorageType.DISK}};
+    config.setLong("dfs.block.size", 2 * DEFAULT_BLOCK_SIZE);
+    long dnCapacity = 1024 * DEFAULT_BLOCK_SIZE + (2 * DEFAULT_BLOCK_SIZE - 1);
+    try {
+      hdfsCluster = startCluster(config, diskTypes, numOfDatanodes,
+          storagesPerDatanode, dnCapacity);
+      dfs = hdfsCluster.getFileSystem();
+      writeContent(file);
+
+      // Change policy to ONE_SSD
+      dfs.setStoragePolicy(new Path(file), ONE_SSD);
+      FSNamesystem namesystem = hdfsCluster.getNamesystem();
+      INode inode = namesystem.getFSDirectory().getINode(file);
+      Path filePath = new Path("/testChooseInSameDatanode");
+      final FSDataOutputStream out =
+          dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE);
+      try {
+        dfs.setStoragePolicy(filePath, ONE_SSD);
+        // Try to fill up SSD part by writing content
+        long remaining = dfs.getStatus().getRemaining() / (3 * 2);
+        for (int i = 0; i < remaining; i++) {
+          out.write(i);
+        }
+      } finally {
+        out.close();
+      }
+      hdfsCluster.triggerHeartbeats();
+      ArrayList<DataNode> dataNodes = hdfsCluster.getDataNodes();
+      // Temporarily disable heart beats, so that we can assert whether any
+      // items schedules for DNs even though DN's does not have space to write.
+      // Disabling heart beats can keep scheduled items on DatanodeDescriptor
+      // itself.
+      for (DataNode dataNode : dataNodes) {
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
+      }
+      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+
+      // Wait for items to be processed
+      waitForAttemptedItems(1, 30000);
+
+      // Make sure no items assigned for movements
+      Set<DatanodeDescriptor> dns = hdfsCluster.getNamesystem()
+          .getBlockManager().getDatanodeManager().getDatanodes();
+      for (DatanodeDescriptor dd : dns) {
+        assertNull(dd.getBlocksToMoveStorages());
+      }
+
+      // Enable heart beats now
+      for (DataNode dataNode : dataNodes) {
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, false);
+      }
+      hdfsCluster.triggerHeartbeats();
+
+      DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 3, 30000,
+          dfs);
+      DFSTestUtil.waitExpectedStorageType(file, StorageType.SSD, 0, 30000, dfs);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
   private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
       throws IOException {
     ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
@@ -769,8 +845,8 @@ public class TestStoragePolicySatisfier {
     // write to DISK
     final FSDataOutputStream out = dfs.create(new Path(fileName),
         replicatonFactor);
-    for (int i = 0; i < 1000; i++) {
-      out.writeChars("t");
+    for (int i = 0; i < 1024; i++) {
+      out.write(i);
     }
     out.close();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org