You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ta...@apache.org on 2021/03/27 08:57:29 UTC

[hadoop] branch trunk updated: HDFS-15879. Exclude slow nodes when choose targets for blocks (#2748)

This is an automated email from the ASF dual-hosted git repository.

tasanuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 72037a6  HDFS-15879. Exclude slow nodes when choose targets for blocks (#2748)
72037a6 is described below

commit 72037a63b176b928ece4443d7ac14a14be89dc8f
Author: litao <to...@gmail.com>
AuthorDate: Sat Mar 27 16:57:09 2021 +0800

    HDFS-15879. Exclude slow nodes when choose targets for blocks (#2748)
    
    Reviewed-by: Dinesh Chitlangia <di...@apache.org>
    Reviewed-by: Takanobu Asanuma <ta...@apache.org>
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  15 +++
 .../BlockPlacementPolicyDefault.java               |  22 +++-
 .../server/blockmanagement/DatanodeManager.java    | 109 ++++++++++++++++-
 .../server/blockmanagement/SlowPeerTracker.java    |  18 +++
 .../src/main/resources/hdfs-default.xml            |  30 +++++
 .../TestReplicationPolicyExcludeSlowNodes.java     | 131 +++++++++++++++++++++
 6 files changed, 322 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b2c9454..23897a5 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1023,6 +1023,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.datanode.outliers.report.interval";
   public static final String DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT =
       "30m";
+  public static final String DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY =
+      "dfs.namenode.max.slowpeer.collect.nodes";
+  public static final int DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT =
+      5;
+  public static final String DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY =
+      "dfs.namenode.slowpeer.collect.interval";
+  public static final String DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT =
+      "30m";
 
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@@ -1176,6 +1184,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_KEY =
       "dfs.namenode.block-placement-policy.default.prefer-local-node";
   public static final boolean  DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_DEFAULT = true;
+  public static final String
+      DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY =
+      "dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled";
+  public static final boolean
+      DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT =
+      false;
+
   public static final String DFS_NAMENODE_GC_TIME_MONITOR_ENABLE =
       "dfs.namenode.gc.time.monitor.enable";
   public static final boolean DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index df687f4..9f68c36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
 import static org.apache.hadoop.util.Time.monotonicNow;
@@ -82,7 +84,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     NODE_TOO_BUSY("the node is too busy"),
     TOO_MANY_NODES_ON_RACK("the rack has too many chosen nodes"),
     NOT_ENOUGH_STORAGE_SPACE("not enough storage space to place the block"),
-    NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable");
+    NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable"),
+    NODE_SLOW("the node is too slow");
 
     private final String text;
 
@@ -99,6 +102,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
   private boolean considerLoadByStorageType;
   protected double considerLoadFactor;
   private boolean preferLocalNode;
+  private boolean dataNodePeerStatsEnabled;
+  private boolean excludeSlowNodesEnabled;
   protected NetworkTopology clusterMap;
   protected Host2NodesMap host2datanodeMap;
   private FSClusterStats stats;
@@ -144,6 +149,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
             DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_KEY,
         DFSConfigKeys.
             DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_DEFAULT);
+    this.dataNodePeerStatsEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+    this.excludeSlowNodesEnabled = conf.getBoolean(
+        DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
+        DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT);
   }
 
   @Override
@@ -1091,6 +1102,15 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
       return false;
     }
 
+    // check if the target is a slow node
+    if (dataNodePeerStatsEnabled && excludeSlowNodesEnabled) {
+      Set<Node> nodes = DatanodeManager.getSlowNodes();
+      if (nodes.contains(node)) {
+        logNodeIsNotChosen(node, NodeNotChosenReason.NODE_SLOW);
+        return false;
+      }
+    }
+
     return true;
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 454e484..6939e65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -18,8 +18,12 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
+import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;
@@ -53,6 +57,7 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Timer;
 
@@ -201,8 +206,16 @@ public class DatanodeManager {
    */
   private final boolean useDfsNetworkTopology;
 
+  private static final String IP_PORT_SEPARATOR = ":";
+
   @Nullable
   private final SlowPeerTracker slowPeerTracker;
+  private static Set<Node> slowNodesSet = Sets.newConcurrentHashSet();
+  private Daemon slowPeerCollectorDaemon;
+  private final long slowPeerCollectionInterval;
+  private final int maxSlowPeerReportNodes;
+  private boolean excludeSlowNodesEnabled;
+
   @Nullable
   private final SlowDiskTracker slowDiskTracker;
   
@@ -242,11 +255,22 @@ public class DatanodeManager {
         DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
         DFSConfigKeys.
             DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_DEFAULT));
-
     final Timer timer = new Timer();
     this.slowPeerTracker = dataNodePeerStatsEnabled ?
         new SlowPeerTracker(conf, timer) : null;
-
+    this.excludeSlowNodesEnabled = conf.getBoolean(
+        DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
+        DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT);
+    this.maxSlowPeerReportNodes = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT);
+    this.slowPeerCollectionInterval = conf.getTimeDuration(
+        DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    if (slowPeerTracker != null && excludeSlowNodesEnabled) {
+      startSlowPeerCollector();
+    }
     this.slowDiskTracker = dataNodeDiskStatsEnabled ?
         new SlowDiskTracker(conf, timer) : null;
 
@@ -356,6 +380,44 @@ public class DatanodeManager {
         DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
   }
 
+  private void startSlowPeerCollector() {
+    if (slowPeerCollectorDaemon != null) {
+      return;
+    }
+    slowPeerCollectorDaemon = new Daemon(new Runnable() {
+      @Override
+      public void run() {
+        while (true) {
+          try {
+            slowNodesSet = getSlowPeers();
+          } catch (Exception e) {
+            LOG.error("Failed to collect slow peers", e);
+          }
+
+          try {
+            Thread.sleep(slowPeerCollectionInterval);
+          } catch (InterruptedException e) {
+            LOG.error("Slow peers collection thread interrupted", e);
+            return;
+          }
+        }
+      }
+    });
+    slowPeerCollectorDaemon.start();
+  }
+
+  public void stopSlowPeerCollector() {
+    if (slowPeerCollectorDaemon == null) {
+      return;
+    }
+    slowPeerCollectorDaemon.interrupt();
+    try {
+      slowPeerCollectorDaemon.join();
+    } catch (InterruptedException e) {
+      LOG.error("Slow peers collection thread did not shutdown", e);
+    }
+  }
+
   private static long getStaleIntervalFromConf(Configuration conf,
       long heartbeatExpireInterval) {
     long staleInterval = conf.getLong(
@@ -401,6 +463,7 @@ public class DatanodeManager {
   void close() {
     datanodeAdminManager.close();
     heartbeatManager.close();
+    stopSlowPeerCollector();
   }
 
   /** @return the network topology. */
@@ -2020,6 +2083,48 @@ public class DatanodeManager {
   }
 
   /**
+   * Returns all tracking slow peers.
+   * @return
+   */
+  public Set<Node> getSlowPeers() {
+    Set<Node> slowPeersSet = Sets.newConcurrentHashSet();
+    if (slowPeerTracker == null) {
+      return slowPeersSet;
+    }
+    ArrayList<String> slowNodes =
+        slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
+    for (String slowNode : slowNodes) {
+      if (StringUtils.isBlank(slowNode)
+              || !slowNode.contains(IP_PORT_SEPARATOR)) {
+        continue;
+      }
+      String ipAddr = slowNode.split(IP_PORT_SEPARATOR)[0];
+      DatanodeDescriptor datanodeByHost =
+          host2DatanodeMap.getDatanodeByHost(ipAddr);
+      if (datanodeByHost != null) {
+        slowPeersSet.add(datanodeByHost);
+      }
+    }
+    return slowPeersSet;
+  }
+
+  /**
+   * Returns all tracking slow peers.
+   * @return
+   */
+  public static Set<Node> getSlowNodes() {
+    return slowNodesSet;
+  }
+
+  /**
+   * Use only for testing.
+   */
+  @VisibleForTesting
+  public SlowPeerTracker getSlowPeerTracker() {
+    return slowPeerTracker;
+  }
+
+  /**
    * Use only for testing.
    */
   @VisibleForTesting
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
index 06dd2c0..c4b1861 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.util.Timer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -234,6 +235,23 @@ public class SlowPeerTracker {
   }
 
   /**
+   * Returns all tracking slow peers.
+   * @param numNodes
+   * @return
+   */
+  public ArrayList<String> getSlowNodes(int numNodes) {
+    Collection<ReportForJson> jsonReports = getJsonReports(numNodes);
+    ArrayList<String> slowNodes = new ArrayList<>();
+    for (ReportForJson jsonReport : jsonReports) {
+      slowNodes.add(jsonReport.getSlowNode());
+    }
+    if (!slowNodes.isEmpty()) {
+      LOG.warn("Slow nodes list: " + slowNodes);
+    }
+    return slowNodes;
+  }
+
+  /**
    * Retrieve reports in a structure for generating JSON, limiting the
    * output to the top numNodes nodes i.e nodes with the most reports.
    * @param numNodes number of nodes to return. This is to limit the
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index df4df48..57e3b1b 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2369,6 +2369,36 @@
 </property>
 
 <property>
+  <name>dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled</name>
+  <value>false</value>
+  <description>
+    If this is set to true, we will filter out slow nodes
+    when choosing targets for blocks.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.max.slowpeer.collect.nodes</name>
+  <value>5</value>
+  <description>
+    How many slow nodes we will collect for filtering out
+    when choosing targets for blocks.
+
+    It is ignored if dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled is false.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.slowpeer.collect.interval</name>
+  <value>30m</value>
+  <description>
+    Interval at which the slow peer trackers runs in the background to collect slow peers.
+
+    It is ignored if dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled is false.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.fileio.profiling.sampling.percentage</name>
   <value>0</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java
new file mode 100644
index 0000000..f40317d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.net.Node;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestReplicationPolicyExcludeSlowNodes
+    extends BaseReplicationPolicyTest {
+
+  public TestReplicationPolicyExcludeSlowNodes(String blockPlacementPolicy) {
+    this.blockPlacementPolicy = blockPlacementPolicy;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        {BlockPlacementPolicyDefault.class.getName()},
+        {BlockPlacementPolicyWithUpgradeDomain.class.getName()},
+        {AvailableSpaceBlockPlacementPolicy.class.getName()},
+        {BlockPlacementPolicyRackFaultTolerant.class.getName()},
+        {AvailableSpaceRackFaultTolerantBlockPlacementPolicy.class.getName()},
+    });
+  }
+
+  @Override
+  DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
+    conf.setBoolean(DFSConfigKeys
+        .DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+        true);
+    conf.setStrings(DFSConfigKeys
+        .DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
+        "1s");
+    conf.setBoolean(DFSConfigKeys
+        .DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
+        true);
+    final String[] racks = {
+        "/rack1",
+        "/rack2",
+        "/rack3",
+        "/rack4",
+        "/rack5",
+        "/rack6"};
+    storages = DFSTestUtil.createDatanodeStorageInfos(racks);
+    return DFSTestUtil.toDatanodeDescriptor(storages);
+  }
+
+  /**
+   * Tests that chooseTarget when excludeSlowNodesEnabled set to true.
+   */
+  @Test
+  public void testChooseTargetExcludeSlowNodes() throws Exception {
+    namenode.getNamesystem().writeLock();
+    try {
+      // add nodes
+      for (int i = 0; i < dataNodes.length; i++) {
+        dnManager.addDatanode(dataNodes[i]);
+      }
+
+      // mock slow nodes
+      SlowPeerTracker tracker = dnManager.getSlowPeerTracker();
+      tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr());
+      tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr());
+      tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr());
+      tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr());
+      tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr());
+      tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr());
+
+      // waiting for slow nodes collector run
+      Thread.sleep(3000);
+
+      // fetch slow nodes
+      Set<Node> slowPeers = dnManager.getSlowPeers();
+
+      // assert slow nodes
+      assertEquals(3, slowPeers.size());
+      for (int i = 0; i < slowPeers.size(); i++) {
+        assertTrue(slowPeers.contains(dataNodes[i]));
+      }
+
+      // mock writer
+      DatanodeDescriptor writerDn = dataNodes[0];
+
+      // call chooseTarget()
+      DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
+          .getBlockPlacementPolicy().chooseTarget("testFile.txt", 3,
+              writerDn, new ArrayList<DatanodeStorageInfo>(), false, null,
+              1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
+
+      // assert targets
+      assertEquals(3, targets.length);
+      for (int i = 0; i < targets.length; i++) {
+        assertTrue(!slowPeers.contains(targets[i].getDatanodeDescriptor()));
+      }
+    } finally {
+      namenode.getNamesystem().writeUnlock();
+    }
+    NameNode.LOG.info("Done working on it");
+  }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org