You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by so...@apache.org on 2020/05/12 15:26:59 UTC

[hadoop] branch branch-3.3 updated: HDFS-15255. Consider StorageType when DatanodeManager#sortLocatedBlock(). Contributed by Lisheng Sun.

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 433aaee  HDFS-15255. Consider StorageType when DatanodeManager#sortLocatedBlock(). Contributed by Lisheng Sun.
433aaee is described below

commit 433aaeefa49f9e745abad200aedfcf85934adf45
Author: S O'Donnell <so...@cloudera.com>
AuthorDate: Tue May 12 14:34:14 2020 +0100

    HDFS-15255. Consider StorageType when DatanodeManager#sortLocatedBlock(). Contributed by Lisheng Sun.
---
 .../org/apache/hadoop/net/NetworkTopology.java     |   1 +
 .../apache/hadoop/hdfs/protocol/LocatedBlock.java  |   2 +-
 .../hdfs/server/federation/MockNamenode.java       |   7 +-
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   4 +
 .../server/blockmanagement/DatanodeManager.java    |  25 ++-
 .../hadoop/hdfs/server/namenode/CacheManager.java  |   6 +
 .../src/main/resources/hdfs-default.xml            |  14 ++
 .../org/apache/hadoop/hdfs/TestDFSInputStream.java |   7 +-
 .../blockmanagement/TestDatanodeManager.java       | 173 +++++++++++++++++++++
 9 files changed, 232 insertions(+), 7 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index 33493ce..cd5490d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -950,6 +950,7 @@ public class NetworkTopology {
    * <p>
    * As an additional twist, we also randomize the nodes at each network
    * distance. This helps with load balancing when there is data skew.
+   * And it helps choose node with more fast storage type.
    *
    * @param reader    Node where data will be read
    * @param nodes     Available replicas with the requested data
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index c1b6a54..ed4d668 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -158,7 +158,7 @@ public class LocatedBlock {
    * {@link org.apache.hadoop.hdfs.protocol.LocatedBlock#updateCachedStorageInfo}
    * to update the cached Storage ID/Type arrays.
    */
-  public DatanodeInfo[] getLocations() {
+  public DatanodeInfoWithStorage[] getLocations() {
     return locs;
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
index 699ea92..7c80ad6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HAServiceStatus;
@@ -58,6 +59,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -537,7 +539,10 @@ public class MockNamenode {
     DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0",
         1111, 1112, 1113, 1114);
     DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
-    when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
+    DatanodeInfoWithStorage datanodeInfoWithStorage =
+        new DatanodeInfoWithStorage(dnInfo, "storageID", StorageType.DEFAULT);
+    when(lb.getLocations())
+        .thenReturn(new DatanodeInfoWithStorage[] {datanodeInfoWithStorage});
     ExtendedBlock eb = mock(ExtendedBlock.class);
     when(eb.getBlockPoolId()).thenReturn(nsId);
     when(lb.getBlock()).thenReturn(eb);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b2f8ad2..2c943b60 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -243,6 +243,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.namenode.read.considerLoad";
   public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT =
       false;
+  public static final String DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY =
+      "dfs.namenode.read.considerStorageType";
+  public static final boolean DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_DEFAULT =
+      false;
   public static final String  DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR =
       "dfs.namenode.redundancy.considerLoad.factor";
   public static final double
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index c26dd90..fbe132a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -137,6 +137,9 @@ public class DatanodeManager {
   /** Whether or not to consider lad for reading. */
   private final boolean readConsiderLoad;
 
+  /** Whether or not to consider storageType for reading. */
+  private final boolean readConsiderStorageType;
+
   /**
    * Whether or not to avoid using stale DataNodes for writing.
    * Note that, even if this is configured, the policy may be
@@ -320,6 +323,15 @@ public class DatanodeManager {
     this.readConsiderLoad = conf.getBoolean(
         DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY,
         DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT);
+    this.readConsiderStorageType = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_DEFAULT);
+    LOG.warn(
+        "{} and {} are incompatible and only one can be enabled. "
+            + "Both are currently enabled.",
+        DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY,
+        DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY);
+
     this.avoidStaleDataNodesForWrite = conf.getBoolean(
         DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
         DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
@@ -524,7 +536,7 @@ public class DatanodeManager {
       }
     }
 
-    DatanodeInfo[] di = lb.getLocations();
+    DatanodeInfoWithStorage[] di = lb.getLocations();
     // Move decommissioned/stale datanodes to the bottom
     Arrays.sort(di, comparator);
 
@@ -547,10 +559,15 @@ public class DatanodeManager {
     lb.updateCachedStorageInfo();
   }
 
-  private Consumer<List<DatanodeInfo>> createSecondaryNodeSorter() {
-    Consumer<List<DatanodeInfo>> secondarySort = null;
+  private Consumer<List<DatanodeInfoWithStorage>> createSecondaryNodeSorter() {
+    Consumer<List<DatanodeInfoWithStorage>> secondarySort = null;
+    if (readConsiderStorageType) {
+      Comparator<DatanodeInfoWithStorage> comp =
+          Comparator.comparing(DatanodeInfoWithStorage::getStorageType);
+      secondarySort = list -> Collections.sort(list, comp);
+    }
     if (readConsiderLoad) {
-      Comparator<DatanodeInfo> comp =
+      Comparator<DatanodeInfoWithStorage> comp =
           Comparator.comparingInt(DatanodeInfo::getXceiverCount);
       secondarySort = list -> Collections.sort(list, comp);
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
index 66a8713..88d7114 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
@@ -43,6 +43,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.locks.ReentrantLock;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -935,6 +936,11 @@ public class CacheManager {
     }
   }
 
+  @SuppressFBWarnings(
+      value="EC_UNRELATED_TYPES",
+      justification="HDFS-15255 Asked Wei-Chiu and Pifta to review this" +
+          " warning and we all agree the code is OK and the warning is not " +
+          "needed")
   private void setCachedLocations(LocatedBlock block) {
     CachedBlock cachedBlock =
         new CachedBlock(block.getBlock().getBlockId(),
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index bf18f3a..ccd20d5 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -327,9 +327,23 @@
   <description>
     Decide if sort block locations considers the target's load or not when read.
     Turn off by default.
+    It is not possible to enable this feature along with dfs.namenode.read.considerStorageType as only one sort can be
+    enabled at a time.
   </description>
 </property>
 
+  <property>
+    <name>dfs.namenode.read.considerStorageType</name>
+    <value>false</value>
+    <description>
+      Decide if sort block locations considers the target's storage type or not when read. Any locations with the same
+      network distance are sorted in order of the storage speed, fastest first (RAM, SSD, Disk, Archive). This is
+      disabled by default, and the locations will be ordered randomly.
+      It is not possible to enable this feature along with dfs.namenode.read.considerLoad as only one sort can be
+      enabled at a time.
+    </description>
+  </property>
+
 <property>
   <name>dfs.datanode.httpserver.filter.handlers</name>
   <value>org.apache.hadoop.hdfs.server.datanode.web.RestCsrfPreventionFilterHandler</value>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
index bdc342a..f2d5805 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
@@ -35,9 +35,11 @@ import java.util.Random;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.net.unix.DomainSocket;
@@ -273,7 +275,10 @@ public class TestDFSInputStream {
       DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111,
               1112, 1113, 1114);
       DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
-      when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
+      DatanodeInfoWithStorage dnInfoStorage =
+          new DatanodeInfoWithStorage(dnInfo, "DISK", StorageType.DISK);
+      when(lb.getLocations()).thenReturn(
+          new DatanodeInfoWithStorage[] {dnInfoStorage});
       DatanodeInfo retDNInfo =
               dfsInputStream.getBestNodeDNAddrPair(lb, null).info;
       assertEquals(dnInfo, retDNInfo);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index ef9758a..cdce754 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -668,6 +668,179 @@ public class TestDatanodeManager {
     assertEquals(2, ipSet.size());
   }
 
+  @Test
+  public void testGetBlockLocationConsiderStorageType()
+      throws IOException, URISyntaxException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY,
+        true);
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
+    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
+    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
+    URL shellScript = getClass()
+        .getResource("/" + Shell.appendScriptExtension("topology-script"));
+    Path resourcePath = Paths.get(shellScript.toURI());
+    FileUtil.setExecutable(resourcePath.toFile(), true);
+    conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
+        resourcePath.toString());
+    DatanodeManager dm = mockDatanodeManager(fsn, conf);
+
+    int totalDNs = 5;
+    // Register 5 datanodes and 2 nodes per rack with different load.
+    DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
+    String[] storageIDs = new String[totalDNs];
+    List<StorageType> storageTypesList =
+        new ArrayList<>(Arrays.asList(StorageType.ARCHIVE, StorageType.DISK,
+            StorageType.SSD, StorageType.DEFAULT, StorageType.SSD));
+    StorageType[] storageTypes = storageTypesList.toArray(new StorageType[0]);
+
+    for (int i = 0; i < totalDNs; i++) {
+      // Register new datanode.
+      String uuid = "UUID-" + i;
+      String ip = "IP-" + i / 2 + "-" + i;
+      DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
+      Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
+      Mockito.when(dr.getIpAddr()).thenReturn(ip);
+      dm.registerDatanode(dr);
+
+      // Get location and storage information.
+      locs[i] = dm.getDatanode(uuid);
+      storageIDs[i] = "storageID-" + i;
+    }
+
+    // Set node 0 decommissioned.
+    locs[0].setDecommissioned();
+
+    // Create LocatedBlock with above locations.
+    ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
+    LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
+    List<LocatedBlock> blocks = new ArrayList<>();
+    blocks.add(block);
+
+    // Test client located at locs[3] in cluster.
+    final String targetIpInCluster = locs[3].getIpAddr();
+    dm.sortLocatedBlocks(targetIpInCluster, blocks);
+    DatanodeInfo[] sortedLocs = block.getLocations();
+    assertEquals(totalDNs, sortedLocs.length);
+    // Ensure the local node is first.
+    assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr());
+    // Ensure choose fast storage type node when distance is same.
+    assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr());
+    assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr());
+    assertEquals(locs[4].getIpAddr(), sortedLocs[2].getIpAddr());
+    assertEquals(locs[1].getIpAddr(), sortedLocs[3].getIpAddr());
+    // Ensure the two decommissioned DNs were moved to the end.
+    assertThat(sortedLocs[4].getAdminState(),
+        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+    assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr());
+
+    // Test client not in cluster but same rack with locs[3].
+    final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
+    dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
+    DatanodeInfo[] sortedLocs2 = block.getLocations();
+    assertEquals(totalDNs, sortedLocs2.length);
+    // Ensure the local rack is first and choose fast storage type node
+    // when distance is same.
+    assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr());
+    assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr());
+    assertEquals(locs[4].getIpAddr(), sortedLocs2[2].getIpAddr());
+    assertEquals(locs[1].getIpAddr(), sortedLocs2[3].getIpAddr());
+    // Ensure the two decommissioned DNs were moved to the end.
+    assertThat(sortedLocs[4].getAdminState(),
+        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+    assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
+  }
+
+  @Test
+  public void testGetBlockLocationConsiderStorageTypeAndLoad()
+      throws IOException, URISyntaxException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY,
+        true);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, true);
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
+    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
+    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
+    URL shellScript = getClass()
+        .getResource("/" + Shell.appendScriptExtension("topology-script"));
+    Path resourcePath = Paths.get(shellScript.toURI());
+    FileUtil.setExecutable(resourcePath.toFile(), true);
+    conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
+        resourcePath.toString());
+    DatanodeManager dm = mockDatanodeManager(fsn, conf);
+
+    int totalDNs = 5;
+    // Register 5 datanodes and 2 nodes per rack with different load.
+    DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
+    String[] storageIDs = new String[totalDNs];
+    List<StorageType> storageTypesList =
+        new ArrayList<>(Arrays.asList(StorageType.DISK, StorageType.DISK,
+            StorageType.DEFAULT, StorageType.SSD, StorageType.SSD));
+    StorageType[] storageTypes = storageTypesList.toArray(new StorageType[0]);
+
+    for (int i = 0; i < totalDNs; i++) {
+      // Register new datanode.
+      String uuid = "UUID-" + i;
+      String ip = "IP-" + i / 2 + "-" + i;
+      DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
+      Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
+      Mockito.when(dr.getIpAddr()).thenReturn(ip);
+      dm.registerDatanode(dr);
+
+      // Get location and storage information.
+      locs[i] = dm.getDatanode(uuid);
+      storageIDs[i] = "storageID-" + i;
+
+      // Set load for datanodes.
+      locs[i].setXceiverCount(i);
+    }
+
+    // Set node 0 decommissioned.
+    locs[0].setDecommissioned();
+
+    // Create LocatedBlock with above locations.
+    ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
+    LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
+    List<LocatedBlock> blocks = new ArrayList<>();
+    blocks.add(block);
+
+    // Test client located at locs[3] in cluster.
+    final String targetIpInCluster = locs[3].getIpAddr();
+    dm.sortLocatedBlocks(targetIpInCluster, blocks);
+    DatanodeInfo[] sortedLocs = block.getLocations();
+    assertEquals(totalDNs, sortedLocs.length);
+    // Ensure the local node is first.
+    assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr());
+    // Ensure choose the light weight node between light weight and fast storage
+    // type node when distance is same.
+    assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr());
+    assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr());
+    assertEquals(locs[1].getIpAddr(), sortedLocs[2].getIpAddr());
+    assertEquals(locs[4].getIpAddr(), sortedLocs[3].getIpAddr());
+    // Ensure the two decommissioned DNs were moved to the end.
+    assertThat(sortedLocs[4].getAdminState(),
+        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+    assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr());
+
+    // Test client not in cluster but same rack with locs[3].
+    final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
+    dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
+    DatanodeInfo[] sortedLocs2 = block.getLocations();
+    assertEquals(totalDNs, sortedLocs2.length);
+    // Ensure the local rack is first and choose the light weight node between
+    // light weight and fast storage type node when distance is same.
+    assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr());
+    assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr());
+    assertEquals(locs[1].getIpAddr(), sortedLocs2[2].getIpAddr());
+    assertEquals(locs[4].getIpAddr(), sortedLocs2[3].getIpAddr());
+    // Ensure the two decommissioned DNs were moved to the end.
+    assertThat(sortedLocs[4].getAdminState(),
+        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+    assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
+  }
+
   /**
    * Test whether removing a host from the includes list without adding it to
    * the excludes list will exclude it from data node reports.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org