You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2022/05/02 21:05:50 UTC

[hadoop] branch trunk updated: HDFS-16521. DFS API to retrieve slow datanodes (#4107)

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2dfa928a201 HDFS-16521. DFS API to retrieve slow datanodes (#4107)
2dfa928a201 is described below

commit 2dfa928a201560bc739ff5e4fe29f8bb19188927
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Mon May 2 14:05:40 2022 -0700

    HDFS-16521. DFS API to retrieve slow datanodes (#4107)
    
    Signed-off-by: stack <st...@apache.org>
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
---
 .../java/org/apache/hadoop/hdfs/DFSClient.java     |   8 ++
 .../apache/hadoop/hdfs/DistributedFileSystem.java  |  11 ++
 .../hadoop/hdfs/ViewDistributedFileSystem.java     |  10 ++
 .../hadoop/hdfs/protocol/ClientProtocol.java       |  12 +++
 .../ClientNamenodeProtocolTranslatorPB.java        |  13 +++
 .../src/main/proto/ClientNamenodeProtocol.proto    |   9 ++
 .../apache/hadoop/hdfs/protocol/TestReadOnly.java  |   3 +-
 .../federation/router/RouterClientProtocol.java    |   6 ++
 .../server/federation/router/RouterRpcServer.java  |  71 +++++++++----
 .../server/federation/router/TestRouterRpc.java    |   1 +
 ...ientNamenodeProtocolServerSideTranslatorPB.java |  16 +++
 .../server/blockmanagement/DatanodeManager.java    |  33 ++++--
 .../datanode/metrics/DataNodePeerMetrics.java      |  32 ++++--
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  |  33 ++++--
 .../hdfs/server/namenode/NameNodeRpcServer.java    |   6 ++
 .../org/apache/hadoop/hdfs/tools/DFSAdmin.java     |  28 +++++-
 .../hadoop-hdfs/src/site/markdown/HDFSCommands.md  |   4 +-
 .../apache/hadoop/hdfs/TestSlowDatanodeReport.java | 112 +++++++++++++++++++++
 18 files changed, 362 insertions(+), 46 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 97045aff909..2682549cba1 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -3491,4 +3491,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private boolean isLocatedBlocksRefresherEnabled() {
     return clientContext.isLocatedBlocksRefresherEnabled();
   }
+
+  public DatanodeInfo[] slowDatanodeReport() throws IOException {
+    checkOpen();
+    try (TraceScope ignored = tracer.newScope("slowDatanodeReport")) {
+      return namenode.getSlowDatanodeReport();
+    }
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 350abab352b..93db332d738 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -3887,4 +3887,15 @@ public class DistributedFileSystem extends FileSystem
       throws IOException {
     return new FileSystemMultipartUploaderBuilder(this, basePath);
   }
+
+  /**
+   * Retrieve stats for slow running datanodes.
+   *
+   * @return An array of slow datanode info.
+   * @throws IOException If an I/O error occurs.
+   */
+  public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
+    return dfs.slowDatanodeReport();
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java
index 2a4469a5ae5..d34df37ae61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java
@@ -2318,4 +2318,14 @@ public class ViewDistributedFileSystem extends DistributedFileSystem {
     }
     return this.vfs.getUsed();
   }
+
+  @Override
+  public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
+    if (this.vfs == null) {
+      return super.getSlowDatanodeStats();
+    }
+    checkDefaultDFS(defaultDFS, "getSlowDatanodeStats");
+    return defaultDFS.getSlowDatanodeStats();
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index ea90645ca08..e9ae803a541 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -1868,4 +1868,16 @@ public interface ClientProtocol {
    */
   @AtMostOnce
   void satisfyStoragePolicy(String path) throws IOException;
+
+  /**
+   * Get report on all of the slow Datanodes. Slow running datanodes are identified based on
+   * the Outlier detection algorithm, if slow peer tracking is enabled for the DFS cluster.
+   *
+   * @return Datanode report for slow running datanodes.
+   * @throws IOException If an I/O error occurs.
+   */
+  @Idempotent
+  @ReadOnly
+  DatanodeInfo[] getSlowDatanodeReport() throws IOException;
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 2668ec1cefe..7b8ca42c50f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -143,6 +143,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLoc
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
@@ -2065,6 +2066,18 @@ public class ClientNamenodeProtocolTranslatorPB implements
     }
   }
 
+  @Override
+  public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
+    GetSlowDatanodeReportRequestProto req =
+        GetSlowDatanodeReportRequestProto.newBuilder().build();
+    try {
+      return PBHelperClient.convert(
+          rpcProxy.getSlowDatanodeReport(null, req).getDatanodeInfoProtoList());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
   @Override
   public HAServiceProtocol.HAServiceState getHAServiceState()
       throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
index 20967cc13ab..ebc56e9e675 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
@@ -424,6 +424,13 @@ message GetPreferredBlockSizeResponseProto {
   required uint64 bsize = 1;
 }
 
+message GetSlowDatanodeReportRequestProto {
+}
+
+message GetSlowDatanodeReportResponseProto {
+  repeated DatanodeInfoProto datanodeInfoProto = 1;
+}
+
 enum SafeModeActionProto {
   SAFEMODE_LEAVE = 1;
   SAFEMODE_ENTER = 2;
@@ -1070,4 +1077,6 @@ service ClientNamenodeProtocol {
       returns(SatisfyStoragePolicyResponseProto);
   rpc getHAServiceState(HAServiceStateRequestProto)
       returns(HAServiceStateResponseProto);
+  rpc getSlowDatanodeReport(GetSlowDatanodeReportRequestProto)
+      returns(GetSlowDatanodeReportResponseProto);
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java
index 4e6f4e3f4ba..f7ea7bcd76d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java
@@ -76,7 +76,8 @@ public class TestReadOnly {
           "getQuotaUsage",
           "msync",
           "getHAServiceState",
-          "getECTopologyResultForPolicies"
+          "getECTopologyResultForPolicies",
+          "getSlowDatanodeReport"
       )
   );
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index d5bbdf046c2..1bd7d65836d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -1815,6 +1815,12 @@ public class RouterClientProtocol implements ClientProtocol {
     storagePolicy.satisfyStoragePolicy(path);
   }
 
+  @Override
+  public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+    return rpcServer.getSlowDatanodeReport(true, 0);
+  }
+
   @Override
   public HAServiceProtocol.HAServiceState getHAServiceState() {
     if (rpcServer.isSafeMode()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 69f300bfb7d..2bec2c726a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -1095,24 +1095,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     Map<FederationNamespaceInfo, DatanodeInfo[]> results =
         rpcClient.invokeConcurrent(nss, method, requireResponse, false,
             timeOutMs, DatanodeInfo[].class);
-    for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
-        results.entrySet()) {
-      FederationNamespaceInfo ns = entry.getKey();
-      DatanodeInfo[] result = entry.getValue();
-      for (DatanodeInfo node : result) {
-        String nodeId = node.getXferAddr();
-        DatanodeInfo dn = datanodesMap.get(nodeId);
-        if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
-          // Add the subcluster as a suffix to the network location
-          node.setNetworkLocation(
-              NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
-              node.getNetworkLocation());
-          datanodesMap.put(nodeId, node);
-        } else {
-          LOG.debug("{} is in multiple subclusters", nodeId);
-        }
-      }
-    }
+    updateDnMap(results, datanodesMap);
     // Map -> Array
     Collection<DatanodeInfo> datanodes = datanodesMap.values();
     return toArray(datanodes, DatanodeInfo.class);
@@ -1578,6 +1561,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     clientProto.satisfyStoragePolicy(path);
   }
 
+  @Override // ClientProtocol
+  public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
+    return clientProto.getSlowDatanodeReport();
+  }
+
   @Override // NamenodeProtocol
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
       long minBlockSize, long hotBlockTimeInterval) throws IOException {
@@ -1994,6 +1982,53 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     return rpcClient.refreshFairnessPolicyController(new Configuration());
   }
 
+  /**
+   * Get the slow running datanodes report with a timeout.
+   *
+   * @param requireResponse If we require all the namespaces to report.
+   * @param timeOutMs Time out for the reply in milliseconds.
+   * @return List of datanodes.
+   * @throws IOException If it cannot get the report.
+   */
+  public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOutMs)
+      throws IOException {
+    checkOperation(OperationCategory.UNCHECKED);
+
+    Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
+    RemoteMethod method = new RemoteMethod("getSlowDatanodeReport");
+
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    Map<FederationNamespaceInfo, DatanodeInfo[]> results =
+        rpcClient.invokeConcurrent(nss, method, requireResponse, false,
+            timeOutMs, DatanodeInfo[].class);
+    updateDnMap(results, datanodesMap);
+    // Map -> Array
+    Collection<DatanodeInfo> datanodes = datanodesMap.values();
+    return toArray(datanodes, DatanodeInfo.class);
+  }
+
+  private void updateDnMap(Map<FederationNamespaceInfo, DatanodeInfo[]> results,
+      Map<String, DatanodeInfo> datanodesMap) {
+    for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
+        results.entrySet()) {
+      FederationNamespaceInfo ns = entry.getKey();
+      DatanodeInfo[] result = entry.getValue();
+      for (DatanodeInfo node : result) {
+        String nodeId = node.getXferAddr();
+        DatanodeInfo dn = datanodesMap.get(nodeId);
+        if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
+          // Add the subcluster as a suffix to the network location
+          node.setNetworkLocation(
+              NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
+                  node.getNetworkLocation());
+          datanodesMap.put(nodeId, node);
+        } else {
+          LOG.debug("{} is in multiple subclusters", nodeId);
+        }
+      }
+    }
+  }
+
   /**
    * Deals with loading datanode report into the cache and refresh.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index ae58bf8b612..96e77d58007 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -704,6 +704,7 @@ public class TestRouterRpc {
 
     DatanodeInfo[] combinedData =
         routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
+    assertEquals(0, routerProtocol.getSlowDatanodeReport().length);
     final Map<Integer, String> routerDNMap = new TreeMap<>();
     for (DatanodeInfo dn : combinedData) {
       String subcluster = dn.getNetworkLocation().split("/")[1];
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 5132afaa4b1..0164f25460d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -156,6 +156,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuo
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
@@ -2058,4 +2060,18 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public GetSlowDatanodeReportResponseProto getSlowDatanodeReport(RpcController controller,
+      GetSlowDatanodeReportRequestProto request) throws ServiceException {
+    try {
+      List<? extends DatanodeInfoProto> result =
+          PBHelperClient.convert(server.getSlowDatanodeReport());
+      return GetSlowDatanodeReportResponseProto.newBuilder()
+          .addAllDatanodeInfoProto(result)
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 0dc33c818c3..b1dbbdde3b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -23,6 +23,8 @@ import static org.apache.hadoop.util.Time.monotonicNow;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.util.Preconditions;
+
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;
 
 import org.apache.hadoop.fs.StorageType;
@@ -1665,7 +1667,17 @@ public class DatanodeManager {
     }
     return nodes;
   }
-  
+
+  public List<DatanodeDescriptor> getAllSlowDataNodes() {
+    if (slowPeerTracker == null) {
+      LOG.debug("{} is disabled. Try enabling it first to capture slow peer outliers.",
+          DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
+      return ImmutableList.of();
+    }
+    List<String> slowNodes = slowPeerTracker.getSlowNodes(getNumOfDataNodes());
+    return getDnDescriptorsFromIpAddr(slowNodes);
+  }
+
   /**
    * Checks if name resolution was successful for the given address.  If IP
    * address and host name are the same, then it means name resolution has
@@ -2148,19 +2160,26 @@ public class DatanodeManager {
     List<String> slowNodes;
     Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
     slowNodes = slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
-    for (String slowNode : slowNodes) {
-      if (StringUtils.isBlank(slowNode)
-              || !slowNode.contains(IP_PORT_SEPARATOR)) {
+    List<DatanodeDescriptor> datanodeDescriptors = getDnDescriptorsFromIpAddr(slowNodes);
+    datanodeDescriptors.forEach(
+        datanodeDescriptor -> slowPeersUuidSet.add(datanodeDescriptor.getDatanodeUuid()));
+    return slowPeersUuidSet;
+  }
+
+  private List<DatanodeDescriptor> getDnDescriptorsFromIpAddr(List<String> nodes) {
+    List<DatanodeDescriptor> datanodeDescriptors = new ArrayList<>();
+    for (String node : nodes) {
+      if (StringUtils.isBlank(node) || !node.contains(IP_PORT_SEPARATOR)) {
         continue;
       }
-      String ipAddr = slowNode.split(IP_PORT_SEPARATOR)[0];
+      String ipAddr = node.split(IP_PORT_SEPARATOR)[0];
       DatanodeDescriptor datanodeByHost =
           host2DatanodeMap.getDatanodeByHost(ipAddr);
       if (datanodeByHost != null) {
-        slowPeersUuidSet.add(datanodeByHost.getDatanodeUuid());
+        datanodeDescriptors.add(datanodeByHost);
       }
     }
-    return slowPeersUuidSet;
+    return datanodeDescriptors;
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
index f62a7b504a1..2e456b67ca1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
@@ -54,6 +54,8 @@ public class DataNodePeerMetrics {
 
   private final String name;
 
+  // Strictly to be used by test code only. Source code is not supposed to use this.
+  private Map<String, Double> testOutlier = null;
 
   private final OutlierDetector slowNodeDetector;
 
@@ -142,14 +144,28 @@ public class DataNodePeerMetrics {
    * than their peers.
    */
   public Map<String, Double> getOutliers() {
-    // This maps the metric name to the aggregate latency.
-    // The metric name is the datanode ID.
-    final Map<String, Double> stats =
-        sendPacketDownstreamRollingAverages.getStats(
-            minOutlierDetectionSamples);
-    LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
-
-    return slowNodeDetector.getOutliers(stats);
+    // outlier must be null for source code.
+    if (testOutlier == null) {
+      // This maps the metric name to the aggregate latency.
+      // The metric name is the datanode ID.
+      final Map<String, Double> stats =
+          sendPacketDownstreamRollingAverages.getStats(minOutlierDetectionSamples);
+      LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
+      return slowNodeDetector.getOutliers(stats);
+    } else {
+      // this happens only for test code.
+      return testOutlier;
+    }
+  }
+
+  /**
+   * Strictly to be used by test code only. Source code is not supposed to use this. This method
+   * directly sets outlier mapping so that aggregate latency metrics are not calculated for tests.
+   *
+   * @param outlier outlier directly set by tests.
+   */
+  public void setTestOutliers(Map<String, Double> outlier) {
+    this.testOutlier = outlier;
   }
 
   public MutableRollingAverages getSendPacketDownstreamRollingAverages() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 259ca9d5b8f..ab3c49fc264 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -4913,6 +4913,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
+  DatanodeInfo[] slowDataNodesReport() throws IOException {
+    String operationName = "slowDataNodesReport";
+    DatanodeInfo[] datanodeInfos;
+    checkOperation(OperationCategory.UNCHECKED);
+    readLock();
+    try {
+      checkOperation(OperationCategory.UNCHECKED);
+      final DatanodeManager dm = getBlockManager().getDatanodeManager();
+      final List<DatanodeDescriptor> results = dm.getAllSlowDataNodes();
+      datanodeInfos = getDatanodeInfoFromDescriptors(results);
+    } finally {
+      readUnlock(operationName, getLockReportInfoSupplier(null));
+    }
+    logAuditEvent(true, operationName, null);
+    return datanodeInfos;
+  }
+
+  private DatanodeInfo[] getDatanodeInfoFromDescriptors(List<DatanodeDescriptor> results) {
+    DatanodeInfo[] datanodeInfos = new DatanodeInfo[results.size()];
+    for (int i = 0; i < datanodeInfos.length; i++) {
+      datanodeInfos[i] = new DatanodeInfoBuilder().setFrom(results.get(i)).build();
+      datanodeInfos[i].setNumBlocks(results.get(i).numBlocks());
+    }
+    return datanodeInfos;
+  }
+
   DatanodeInfo[] datanodeReport(final DatanodeReportType type)
       throws IOException {
     String operationName = "datanodeReport";
@@ -4924,12 +4950,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.UNCHECKED);
       final DatanodeManager dm = getBlockManager().getDatanodeManager();      
       final List<DatanodeDescriptor> results = dm.getDatanodeListForReport(type);
-      arr = new DatanodeInfo[results.size()];
-      for (int i=0; i<arr.length; i++) {
-        arr[i] = new DatanodeInfoBuilder().setFrom(results.get(i))
-            .build();
-        arr[i].setNumBlocks(results.get(i).numBlocks());
-      }
+      arr = getDatanodeInfoFromDescriptors(results);
     } finally {
       readUnlock(operationName, getLockReportInfoSupplier(null));
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 73957bcafeb..c86211b058b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1509,6 +1509,12 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     }
   }
 
+  @Override
+  public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
+    checkNNStartup();
+    return namesystem.slowDataNodesReport();
+  }
+
   @Override // ClientProtocol
   public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
                        StorageType type)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index dec8bef24b4..3462d3dafcc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -433,7 +433,7 @@ public class DFSAdmin extends FsShell {
    */
   private static final String commonUsageSummary =
     "\t[-report [-live] [-dead] [-decommissioning] " +
-    "[-enteringmaintenance] [-inmaintenance]]\n" +
+      "[-enteringmaintenance] [-inmaintenance] [-slownodes]]\n" +
     "\t[-safemode <enter | leave | get | wait | forceExit>]\n" +
     "\t[-saveNamespace [-beforeShutdown]]\n" +
     "\t[-rollEdits]\n" +
@@ -587,11 +587,13 @@ public class DFSAdmin extends FsShell {
         StringUtils.popOption("-enteringmaintenance", args);
     final boolean listInMaintenance =
         StringUtils.popOption("-inmaintenance", args);
+    final boolean listSlowNodes =
+        StringUtils.popOption("-slownodes", args);
 
 
     // If no filter flags are found, then list all DN types
     boolean listAll = (!listLive && !listDead && !listDecommissioning
-        && !listEnteringMaintenance && !listInMaintenance);
+        && !listEnteringMaintenance && !listInMaintenance && !listSlowNodes);
 
     if (listAll || listLive) {
       printDataNodeReports(dfs, DatanodeReportType.LIVE, listLive, "Live");
@@ -615,6 +617,10 @@ public class DFSAdmin extends FsShell {
       printDataNodeReports(dfs, DatanodeReportType.IN_MAINTENANCE,
           listInMaintenance, "In maintenance");
     }
+
+    if (listAll || listSlowNodes) {
+      printSlowDataNodeReports(dfs, listSlowNodes, "Slow");
+    }
   }
 
   private static void printDataNodeReports(DistributedFileSystem dfs,
@@ -632,6 +638,20 @@ public class DFSAdmin extends FsShell {
     }
   }
 
+  private static void printSlowDataNodeReports(DistributedFileSystem dfs, boolean listNodes,
+      String nodeState) throws IOException {
+    DatanodeInfo[] nodes = dfs.getSlowDatanodeStats();
+    if (nodes.length > 0 || listNodes) {
+      System.out.println(nodeState + " datanodes (" + nodes.length + "):\n");
+    }
+    if (nodes.length > 0) {
+      for (DatanodeInfo dn : nodes) {
+        System.out.println(dn.getDatanodeReport());
+        System.out.println();
+      }
+    }
+  }
+
   /**
    * Safe mode maintenance command.
    * Usage: hdfs dfsadmin -safemode [enter | leave | get | wait | forceExit]
@@ -1148,7 +1168,7 @@ public class DFSAdmin extends FsShell {
       commonUsageSummary;
 
     String report ="-report [-live] [-dead] [-decommissioning] "
-        + "[-enteringmaintenance] [-inmaintenance]:\n" +
+        + "[-enteringmaintenance] [-inmaintenance] [-slownodes]:\n" +
         "\tReports basic filesystem information and statistics. \n" +
         "\tThe dfs usage can be different from \"du\" usage, because it\n" +
         "\tmeasures raw space used by replication, checksums, snapshots\n" +
@@ -2126,7 +2146,7 @@ public class DFSAdmin extends FsShell {
     if ("-report".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
           + " [-report] [-live] [-dead] [-decommissioning]"
-          + " [-enteringmaintenance] [-inmaintenance]");
+          + " [-enteringmaintenance] [-inmaintenance] [-slownodes]");
     } else if ("-safemode".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
           + " [-safemode enter | leave | get | wait | forceExit]");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index a11f2098705..4fe788dc62c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -356,7 +356,7 @@ Runs a HDFS datanode.
 
 Usage:
 
-        hdfs dfsadmin [-report [-live] [-dead] [-decommissioning] [-enteringmaintenance] [-inmaintenance]]
+        hdfs dfsadmin [-report [-live] [-dead] [-decommissioning] [-enteringmaintenance] [-inmaintenance] [-slownodes]]
         hdfs dfsadmin [-safemode enter | leave | get | wait | forceExit]
         hdfs dfsadmin [-saveNamespace [-beforeShutdown]]
         hdfs dfsadmin [-rollEdits]
@@ -394,7 +394,7 @@ Usage:
 
 | COMMAND\_OPTION | Description |
 |:---- |:---- |
-| `-report` `[-live]` `[-dead]` `[-decommissioning]` `[-enteringmaintenance]` `[-inmaintenance]` | Reports basic filesystem information and statistics, The dfs usage can be different from "du" usage, because it measures raw space used by replication, checksums, snapshots and etc. on all the DNs. Optional flags may be used to filter the list of displayed DataNodes. |
+| `-report` `[-live]` `[-dead]` `[-decommissioning]` `[-enteringmaintenance]` `[-inmaintenance]` `[-slownodes]` | Reports basic filesystem information and statistics, The dfs usage can be different from "du" usage, because it measures raw space used by replication, checksums, snapshots and etc. on all the DNs. Optional flags may be used to filter the list of displayed DataNodes. Filters are either based on the DN state (e.g. live, dead, decommissioning) or the nature of the DN (e.g. slow [...]
 | `-safemode` enter\|leave\|get\|wait\|forceExit | Safe mode maintenance command. Safe mode is a Namenode state in which it <br/>1. does not accept changes to the name space (read-only) <br/>2. does not replicate or delete blocks. <br/>Safe mode is entered automatically at Namenode startup, and leaves safe mode automatically when the configured minimum percentage of blocks satisfies the minimum replication condition. If Namenode detects any anomaly then it will linger in safe mode till t [...]
 | `-saveNamespace` `[-beforeShutdown]` | Save current namespace into storage directories and reset edits log. Requires safe mode. If the "beforeShutdown" option is given, the NameNode does a checkpoint if and only if no checkpoint has been done during a time window (a configurable number of checkpoint periods). This is usually used before shutting down the NameNode to prevent potential fsimage/editlog corruption. |
 | `-rollEdits` | Rolls the edit log on the active NameNode. |
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java
new file mode 100644
index 00000000000..583c3159d5c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys
+    .DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
+
+/**
+ * Tests to report slow running datanodes.
+ */
+public class TestSlowDatanodeReport {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestSlowDatanodeReport.class);
+
+  private MiniDFSCluster cluster;
+
+  @Before
+  public void testSetup() throws Exception {
+    Configuration conf = new Configuration();
+
+    conf.set(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "1000");
+    conf.set(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
+    conf.set(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY, "1");
+    conf.set(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY, "1");
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testSingleNodeReport() throws Exception {
+    List<DataNode> dataNodes = cluster.getDataNodes();
+    DataNode slowNode = dataNodes.get(1);
+    dataNodes.get(0).getPeerMetrics().setTestOutliers(
+        ImmutableMap.of(slowNode.getDatanodeHostname() + ":" + slowNode.getIpcPort(), 15.5));
+    DistributedFileSystem distributedFileSystem = cluster.getFileSystem();
+    Assert.assertEquals(3, distributedFileSystem.getDataNodeStats().length);
+    GenericTestUtils.waitFor(() -> {
+      try {
+        DatanodeInfo[] slowNodeInfo = distributedFileSystem.getSlowDatanodeStats();
+        LOG.info("Slow Datanode report: {}", Arrays.asList(slowNodeInfo));
+        return slowNodeInfo.length == 1;
+      } catch (IOException e) {
+        LOG.error("Failed to retrieve slownode report", e);
+        return false;
+      }
+    }, 2000, 180000, "Slow nodes could not be detected");
+  }
+
+  @Test
+  public void testMultiNodesReport() throws Exception {
+    List<DataNode> dataNodes = cluster.getDataNodes();
+    dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of(
+        dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 15.5));
+    dataNodes.get(1).getPeerMetrics().setTestOutliers(ImmutableMap.of(
+        dataNodes.get(2).getDatanodeHostname() + ":" + dataNodes.get(2).getIpcPort(), 18.7));
+    DistributedFileSystem distributedFileSystem = cluster.getFileSystem();
+    Assert.assertEquals(3, distributedFileSystem.getDataNodeStats().length);
+    GenericTestUtils.waitFor(() -> {
+      try {
+        DatanodeInfo[] slowNodeInfo = distributedFileSystem.getSlowDatanodeStats();
+        LOG.info("Slow Datanode report: {}", Arrays.asList(slowNodeInfo));
+        return slowNodeInfo.length == 2;
+      } catch (IOException e) {
+        LOG.error("Failed to retrieve slownode report", e);
+        return false;
+      }
+    }, 2000, 200000, "Slow nodes could not be detected");
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org