You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:58:50 UTC

[38/50] [abbrv] hadoop git commit: HDFS-8946. Improve choosing datanode storage for block placement. (yliu)

HDFS-8946. Improve choosing datanode storage for block placement. (yliu)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8fa41d9d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8fa41d9d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8fa41d9d

Branch: refs/heads/HDFS-7285
Commit: 8fa41d9dd4b923bf4141f019414a1a8b079124c6
Parents: 4eaa7fd
Author: yliu <yl...@apache.org>
Authored: Tue Sep 1 08:52:50 2015 +0800
Committer: yliu <yl...@apache.org>
Committed: Tue Sep 1 08:52:50 2015 +0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../BlockPlacementPolicyDefault.java            | 147 ++++++-------------
 .../blockmanagement/DatanodeDescriptor.java     |  36 +++--
 .../blockmanagement/TestReplicationPolicy.java  |  26 +++-
 4 files changed, 93 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fa41d9d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index ef8fac5..6584c84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -870,6 +870,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-8990. Move RemoteBlockReader to hdfs-client module.
     (Mingliang via wheat9)
 
+    HDFS-8946. Improve choosing datanode storage for block placement. (yliu)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fa41d9d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index 6d7a765..f761150 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -26,12 +26,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
@@ -458,19 +455,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
         for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
             .entrySet().iterator(); iter.hasNext(); ) {
           Map.Entry<StorageType, Integer> entry = iter.next();
-          for (DatanodeStorageInfo localStorage : DFSUtil.shuffle(
-              localDatanode.getStorageInfos())) {
-            StorageType type = entry.getKey();
-            if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
-                results, type) >= 0) {
-              int num = entry.getValue();
-              if (num == 1) {
-                iter.remove();
-              } else {
-                entry.setValue(num - 1);
-              }
-              return localStorage;
+          DatanodeStorageInfo localStorage = chooseStorage4Block(
+              localDatanode, blocksize, results, entry.getKey());
+          if (localStorage != null) {
+            // add node and related nodes to excludedNode
+            addToExcludedNodes(localDatanode, excludedNodes);
+            int num = entry.getValue();
+            if (num == 1) {
+              iter.remove();
+            } else {
+              entry.setValue(num - 1);
             }
+            return localStorage;
           }
         }
       } 
@@ -651,7 +647,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
                             boolean avoidStaleNodes,
                             EnumMap<StorageType, Integer> storageTypes)
                             throws NotEnoughReplicasException {
-      
+
     int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
         scope, excludedNodes);
     StringBuilder builder = null;
@@ -669,49 +665,39 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
           builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" [");
         }
         numOfAvailableNodes--;
-        if (!isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad,
+        DatanodeStorageInfo storage = null;
+        if (isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad,
             results, avoidStaleNodes)) {
-          if (LOG.isDebugEnabled()) {
-            builder.append("\n]");
-          }
-          badTarget = true;
-          continue;
-        }
-
-        final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
-            chosenNode.getStorageInfos());
-        int i = 0;
-        boolean search = true;
-        for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
-            .entrySet().iterator(); search && iter.hasNext(); ) {
-          Map.Entry<StorageType, Integer> entry = iter.next();
-          for (i = 0; i < storages.length; i++) {
-            StorageType type = entry.getKey();
-            final int newExcludedNodes = addIfIsGoodTarget(storages[i],
-                excludedNodes, blocksize, results, type);
-            if (newExcludedNodes >= 0) {
+          for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
+              .entrySet().iterator(); iter.hasNext(); ) {
+            Map.Entry<StorageType, Integer> entry = iter.next();
+            storage = chooseStorage4Block(
+                chosenNode, blocksize, results, entry.getKey());
+            if (storage != null) {
               numOfReplicas--;
               if (firstChosen == null) {
-                firstChosen = storages[i];
+                firstChosen = storage;
               }
-              numOfAvailableNodes -= newExcludedNodes;
+              // add node and related nodes to excludedNode
+              numOfAvailableNodes -=
+                  addToExcludedNodes(chosenNode, excludedNodes);
               int num = entry.getValue();
               if (num == 1) {
                 iter.remove();
               } else {
                 entry.setValue(num - 1);
               }
-              search = false;
               break;
             }
           }
         }
+
         if (LOG.isDebugEnabled()) {
           builder.append("\n]");
         }
 
         // If no candidate storage was found on this DN then set badTarget.
-        badTarget = (i == storages.length);
+        badTarget = (storage == null);
       }
     }
       
@@ -740,32 +726,27 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
   }
 
   /**
-   * If the given storage is a good target, add it to the result list and
-   * update the set of excluded nodes.
-   * @return -1 if the given is not a good target;
-   *         otherwise, return the number of nodes added to excludedNodes set.
+   * Choose a good storage of given storage type from datanode, and add it to
+   * the result list.
+   *
+   * @param dnd datanode descriptor
+   * @param blockSize requested block size
+   * @param results the result storages
+   * @param storageType requested storage type
+   * @return the chosen datanode storage
    */
-  int addIfIsGoodTarget(DatanodeStorageInfo storage,
-      Set<Node> excludedNodes,
+  DatanodeStorageInfo chooseStorage4Block(DatanodeDescriptor dnd,
       long blockSize,
       List<DatanodeStorageInfo> results,
       StorageType storageType) {
-    if (isGoodTarget(storage, blockSize, results, storageType)) {
+    DatanodeStorageInfo storage =
+        dnd.chooseStorage4Block(storageType, blockSize);
+    if (storage != null) {
       results.add(storage);
-      // add node and related nodes to excludedNode
-      return addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
-    } else { 
-      return -1;
-    }
-  }
-
-  private static void logNodeIsNotChosen(DatanodeStorageInfo storage, String reason) {
-    if (LOG.isDebugEnabled()) {
-      // build the error message for later use.
-      debugLoggingBuilder.get()
-          .append("\n  Storage ").append(storage)
-          .append(" is not chosen since ").append(reason).append(".");
+    } else {
+      logNodeIsNotChosen(dnd, "no good storage to place the block ");
     }
+    return storage;
   }
 
   private static void logNodeIsNotChosen(DatanodeDescriptor node,
@@ -837,52 +818,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
   }
 
   /**
-   * Determine if a storage is a good target.
-   *
-   * @param storage The target storage
-   * @param blockSize Size of block
-   * @param results A list containing currently chosen nodes. Used to check if
-   *                too many nodes has been chosen in the target rack.
-   * @return Return true if <i>node</i> has enough space.
-   */
-  private boolean isGoodTarget(DatanodeStorageInfo storage,
-                               long blockSize,
-                               List<DatanodeStorageInfo> results,
-                               StorageType requiredStorageType) {
-    if (storage.getStorageType() != requiredStorageType) {
-      logNodeIsNotChosen(storage, "storage types do not match,"
-          + " where the required storage type is " + requiredStorageType);
-      return false;
-    }
-    if (storage.getState() == State.READ_ONLY_SHARED) {
-      logNodeIsNotChosen(storage, "storage is read-only");
-      return false;
-    }
-
-    if (storage.getState() == State.FAILED) {
-      logNodeIsNotChosen(storage, "storage has failed");
-      return false;
-    }
-
-    DatanodeDescriptor node = storage.getDatanodeDescriptor();
-
-    final long requiredSize = blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
-    final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType());
-    final long remaining = node.getRemaining(storage.getStorageType(),
-        requiredSize);
-    if (requiredSize > remaining - scheduledSize) {
-      logNodeIsNotChosen(storage, "the node does not have enough "
-          + storage.getStorageType() + " space"
-          + " (required=" + requiredSize
-          + ", scheduled=" + scheduledSize
-          + ", remaining=" + remaining + ")");
-      return false;
-    }
-
-    return true;
-  }
-
-  /**
    * Return a pipeline of nodes.
    * The pipeline is formed finding a shortest path that 
    * starts from the writer and traverses all <i>nodes</i>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fa41d9d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 7e3c59b..0b398c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -31,14 +31,15 @@ import java.util.Queue;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import com.google.common.collect.ImmutableList;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -663,26 +664,39 @@ public class DatanodeDescriptor extends DatanodeInfo {
   }
 
   /**
-   * Return the sum of remaining spaces of the specified type. If the remaining
-   * space of a storage is less than minSize, it won't be counted toward the
-   * sum.
+   * Find whether the datanode contains good storage of given type to
+   * place block of size <code>blockSize</code>.
    *
-   * @param t The storage type. If null, the type is ignored.
-   * @param minSize The minimum free space required.
-   * @return the sum of remaining spaces that are bigger than minSize.
+   * <p>Currently datanode only cares about the storage type, in this
+   * method, the first storage of given type we see is returned.
+   *
+   * @param t requested storage type
+   * @param blockSize requested block size
+   * @return
    */
-  public long getRemaining(StorageType t, long minSize) {
+  public DatanodeStorageInfo chooseStorage4Block(StorageType t,
+      long blockSize) {
+    final long requiredSize =
+        blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
+    final long scheduledSize = blockSize * getBlocksScheduled(t);
     long remaining = 0;
+    DatanodeStorageInfo storage = null;
     for (DatanodeStorageInfo s : getStorageInfos()) {
       if (s.getState() == State.NORMAL &&
-          (t == null || s.getStorageType() == t)) {
+          s.getStorageType() == t) {
+        if (storage == null) {
+          storage = s;
+        }
         long r = s.getRemaining();
-        if (r >= minSize) {
+        if (r >= requiredSize) {
           remaining += r;
         }
       }
     }
-    return remaining;
+    if (requiredSize > remaining - scheduledSize) {
+      return null;
+    }
+    return storage;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fa41d9d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index cec33fe..27d647c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -181,7 +181,7 @@ public class TestReplicationPolicy {
    * considered.
    */
   @Test
-  public void testChooseNodeWithMultipleStorages() throws Exception {
+  public void testChooseNodeWithMultipleStorages1() throws Exception {
     updateHeartbeatWithUsage(dataNodes[5],
         2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
         (2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L,
@@ -201,6 +201,30 @@ public class TestReplicationPolicy {
   }
 
   /**
+   * Test whether all storages on the datanode are considered while
+   * choosing target to place block.
+   */
+  @Test
+  public void testChooseNodeWithMultipleStorages2() throws Exception {
+    updateHeartbeatWithUsage(dataNodes[5],
+        2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        (2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L,
+        0L, 0L, 0, 0);
+
+    updateHeartbeatForExtraStorage(
+        2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L);
+
+    DatanodeStorageInfo[] targets;
+    targets = chooseTarget (1, dataNodes[5],
+        new ArrayList<DatanodeStorageInfo>(), null);
+    assertEquals(1, targets.length);
+    assertEquals(dataNodes[5], targets[0].getDatanodeDescriptor());
+
+    resetHeartbeatForStorages();
+  }
+
+  /**
    * In this testcase, client is dataNodes[0]. So the 1st replica should be
    * placed on dataNodes[0], the 2nd replica should be placed on 
    * different rack and third should be placed on different node