You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2022/05/05 20:55:57 UTC

[hadoop] branch branch-3.3 updated: HDFS-16521. DFS API to retrieve slow datanodes (#4107) (#4259)

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 4d935eaed72 HDFS-16521. DFS API to retrieve slow datanodes (#4107) (#4259)
4d935eaed72 is described below

commit 4d935eaed722061070138490a3147b5f0e3b5ae2
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Thu May 5 13:55:46 2022 -0700

    HDFS-16521. DFS API to retrieve slow datanodes (#4107) (#4259)
---
 .../java/org/apache/hadoop/hdfs/DFSClient.java     |   8 ++
 .../apache/hadoop/hdfs/DistributedFileSystem.java  |  11 ++
 .../hadoop/hdfs/ViewDistributedFileSystem.java     |  10 ++
 .../hadoop/hdfs/protocol/ClientProtocol.java       |  12 +++
 .../ClientNamenodeProtocolTranslatorPB.java        |  13 +++
 .../src/main/proto/ClientNamenodeProtocol.proto    |   9 ++
 .../apache/hadoop/hdfs/protocol/TestReadOnly.java  |   3 +-
 .../federation/router/RouterClientProtocol.java    |   6 ++
 .../server/federation/router/RouterRpcServer.java  |  72 +++++++++----
 .../server/federation/router/TestRouterRpc.java    |   1 +
 ...ientNamenodeProtocolServerSideTranslatorPB.java |  16 +++
 .../server/blockmanagement/DatanodeManager.java    |  32 ++++--
 .../datanode/metrics/DataNodePeerMetrics.java      |  32 ++++--
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  |  33 ++++--
 .../hdfs/server/namenode/NameNodeRpcServer.java    |   6 ++
 .../org/apache/hadoop/hdfs/tools/DFSAdmin.java     |  28 +++++-
 .../hadoop-hdfs/src/site/markdown/HDFSCommands.md  |   4 +-
 .../apache/hadoop/hdfs/TestSlowDatanodeReport.java | 112 +++++++++++++++++++++
 .../src/test/resources/testHDFSConf.xml            |   2 +-
 19 files changed, 363 insertions(+), 47 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index ea9736df17c..ffd7256bb59 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -3434,4 +3434,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private boolean isLocatedBlocksRefresherEnabled() {
     return clientContext.isLocatedBlocksRefresherEnabled();
   }
+
+  public DatanodeInfo[] slowDatanodeReport() throws IOException {
+    checkOpen();
+    try (TraceScope ignored = tracer.newScope("slowDatanodeReport")) {
+      return namenode.getSlowDatanodeReport();
+    }
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 4525dbb43dc..f74db63d4f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -3651,4 +3651,15 @@ public class DistributedFileSystem extends FileSystem
       throws IOException {
     return new FileSystemMultipartUploaderBuilder(this, basePath);
   }
+
+  /**
+   * Retrieve stats for slow running datanodes.
+   *
+   * @return An array of slow datanode info.
+   * @throws IOException If an I/O error occurs.
+   */
+  public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
+    return dfs.slowDatanodeReport();
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java
index 29404ae366b..6f8ebae696a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java
@@ -2318,4 +2318,14 @@ public class ViewDistributedFileSystem extends DistributedFileSystem {
     }
     return this.vfs.getUsed();
   }
+
+  @Override
+  public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
+    if (this.vfs == null) {
+      return super.getSlowDatanodeStats();
+    }
+    checkDefaultDFS(defaultDFS, "getSlowDatanodeStats");
+    return defaultDFS.getSlowDatanodeStats();
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 2f4dfb9b46c..086cfacb4e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -1856,4 +1856,16 @@ public interface ClientProtocol {
    */
   @AtMostOnce
   void satisfyStoragePolicy(String path) throws IOException;
+
+  /**
+   * Get report on all of the slow Datanodes. Slow running datanodes are identified based on
+   * the Outlier detection algorithm, if slow peer tracking is enabled for the DFS cluster.
+   *
+   * @return Datanode report for slow running datanodes.
+   * @throws IOException If an I/O error occurs.
+   */
+  @Idempotent
+  @ReadOnly
+  DatanodeInfo[] getSlowDatanodeReport() throws IOException;
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 569f1add9e1..ca5d978713f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -144,6 +144,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLoc
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
@@ -2044,6 +2045,18 @@ public class ClientNamenodeProtocolTranslatorPB implements
     }
   }
 
+  @Override
+  public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
+    GetSlowDatanodeReportRequestProto req =
+        GetSlowDatanodeReportRequestProto.newBuilder().build();
+    try {
+      return PBHelperClient.convert(
+          rpcProxy.getSlowDatanodeReport(null, req).getDatanodeInfoProtoList());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
   @Override
   public HAServiceProtocol.HAServiceState getHAServiceState()
       throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
index 3fb57bc02d0..49205996fcd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
@@ -416,6 +416,13 @@ message GetPreferredBlockSizeResponseProto {
   required uint64 bsize = 1;
 }
 
+message GetSlowDatanodeReportRequestProto {
+}
+
+message GetSlowDatanodeReportResponseProto {
+  repeated DatanodeInfoProto datanodeInfoProto = 1;
+}
+
 enum SafeModeActionProto {
   SAFEMODE_LEAVE = 1;
   SAFEMODE_ENTER = 2;
@@ -1060,4 +1067,6 @@ service ClientNamenodeProtocol {
       returns(SatisfyStoragePolicyResponseProto);
   rpc getHAServiceState(HAServiceStateRequestProto)
       returns(HAServiceStateResponseProto);
+  rpc getSlowDatanodeReport(GetSlowDatanodeReportRequestProto)
+      returns(GetSlowDatanodeReportResponseProto);
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java
index 41069b43978..7e74b3354a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java
@@ -75,7 +75,8 @@ public class TestReadOnly {
           "getQuotaUsage",
           "msync",
           "getHAServiceState",
-          "getECTopologyResultForPolicies"
+          "getECTopologyResultForPolicies",
+          "getSlowDatanodeReport"
       )
   );
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index 744d35ded9e..f311b2f3947 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -1791,6 +1791,12 @@ public class RouterClientProtocol implements ClientProtocol {
     storagePolicy.satisfyStoragePolicy(path);
   }
 
+  @Override
+  public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+    return rpcServer.getSlowDatanodeReport(true, 0);
+  }
+
   @Override
   public HAServiceProtocol.HAServiceState getHAServiceState() {
     if (rpcServer.isSafeMode()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 559a7a22094..13189964d19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -898,24 +898,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     Map<FederationNamespaceInfo, DatanodeInfo[]> results =
         rpcClient.invokeConcurrent(nss, method, requireResponse, false,
             timeOutMs, DatanodeInfo[].class);
-    for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
-        results.entrySet()) {
-      FederationNamespaceInfo ns = entry.getKey();
-      DatanodeInfo[] result = entry.getValue();
-      for (DatanodeInfo node : result) {
-        String nodeId = node.getXferAddr();
-        DatanodeInfo dn = datanodesMap.get(nodeId);
-        if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
-          // Add the subcluster as a suffix to the network location
-          node.setNetworkLocation(
-              NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
-              node.getNetworkLocation());
-          datanodesMap.put(nodeId, node);
-        } else {
-          LOG.debug("{} is in multiple subclusters", nodeId);
-        }
-      }
-    }
+    updateDnMap(results, datanodesMap);
     // Map -> Array
     Collection<DatanodeInfo> datanodes = datanodesMap.values();
     return toArray(datanodes, DatanodeInfo.class);
@@ -1358,6 +1341,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     clientProto.satisfyStoragePolicy(path);
   }
 
+  @Override // ClientProtocol
+  public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
+    return clientProto.getSlowDatanodeReport();
+  }
+
   @Override // NamenodeProtocol
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
       long minBlockSize) throws IOException {
@@ -1757,4 +1745,52 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
   public String[] getGroupsForUser(String user) throws IOException {
     return routerProto.getGroupsForUser(user);
   }
+
+  /**
+   * Get the slow running datanodes report with a timeout.
+   *
+   * @param requireResponse If we require all the namespaces to report.
+   * @param timeOutMs Time out for the reply in milliseconds.
+   * @return List of datanodes.
+   * @throws IOException If it cannot get the report.
+   */
+  public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOutMs)
+      throws IOException {
+    checkOperation(OperationCategory.UNCHECKED);
+
+    Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
+    RemoteMethod method = new RemoteMethod("getSlowDatanodeReport");
+
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    Map<FederationNamespaceInfo, DatanodeInfo[]> results =
+        rpcClient.invokeConcurrent(nss, method, requireResponse, false,
+            timeOutMs, DatanodeInfo[].class);
+    updateDnMap(results, datanodesMap);
+    // Map -> Array
+    Collection<DatanodeInfo> datanodes = datanodesMap.values();
+    return toArray(datanodes, DatanodeInfo.class);
+  }
+
+  private void updateDnMap(Map<FederationNamespaceInfo, DatanodeInfo[]> results,
+      Map<String, DatanodeInfo> datanodesMap) {
+    for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
+        results.entrySet()) {
+      FederationNamespaceInfo ns = entry.getKey();
+      DatanodeInfo[] result = entry.getValue();
+      for (DatanodeInfo node : result) {
+        String nodeId = node.getXferAddr();
+        DatanodeInfo dn = datanodesMap.get(nodeId);
+        if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
+          // Add the subcluster as a suffix to the network location
+          node.setNetworkLocation(
+              NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
+                  node.getNetworkLocation());
+          datanodesMap.put(nodeId, node);
+        } else {
+          LOG.debug("{} is in multiple subclusters", nodeId);
+        }
+      }
+    }
+  }
+
 }
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index 94b3a5e90a8..ae0908894de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -690,6 +690,7 @@ public class TestRouterRpc {
 
     DatanodeInfo[] combinedData =
         routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
+    assertEquals(0, routerProtocol.getSlowDatanodeReport().length);
     final Map<Integer, String> routerDNMap = new TreeMap<>();
     for (DatanodeInfo dn : combinedData) {
       String subcluster = dn.getNetworkLocation().split("/")[1];
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index e0afe006a2f..ba3b508bb8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -155,6 +155,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuo
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
@@ -2034,4 +2036,18 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public GetSlowDatanodeReportResponseProto getSlowDatanodeReport(RpcController controller,
+      GetSlowDatanodeReportRequestProto request) throws ServiceException {
+    try {
+      List<? extends DatanodeInfoProto> result =
+          PBHelperClient.convert(server.getSlowDatanodeReport());
+      return GetSlowDatanodeReportResponseProto.newBuilder()
+          .addAllDatanodeInfoProto(result)
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index b7703eb4856..c9365206daf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -1650,7 +1651,17 @@ public class DatanodeManager {
     }
     return nodes;
   }
-  
+
+  public List<DatanodeDescriptor> getAllSlowDataNodes() {
+    if (slowPeerTracker == null) {
+      LOG.debug("{} is disabled. Try enabling it first to capture slow peer outliers.",
+          DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
+      return ImmutableList.of();
+    }
+    List<String> slowNodes = slowPeerTracker.getSlowNodes(getNumOfDataNodes());
+    return getDnDescriptorsFromIpAddr(slowNodes);
+  }
+
   /**
    * Checks if name resolution was successful for the given address.  If IP
    * address and host name are the same, then it means name resolution has
@@ -2133,19 +2144,26 @@ public class DatanodeManager {
     List<String> slowNodes;
     Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
     slowNodes = slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
-    for (String slowNode : slowNodes) {
-      if (StringUtils.isBlank(slowNode)
-              || !slowNode.contains(IP_PORT_SEPARATOR)) {
+    List<DatanodeDescriptor> datanodeDescriptors = getDnDescriptorsFromIpAddr(slowNodes);
+    datanodeDescriptors.forEach(
+        datanodeDescriptor -> slowPeersUuidSet.add(datanodeDescriptor.getDatanodeUuid()));
+    return slowPeersUuidSet;
+  }
+
+  private List<DatanodeDescriptor> getDnDescriptorsFromIpAddr(List<String> nodes) {
+    List<DatanodeDescriptor> datanodeDescriptors = new ArrayList<>();
+    for (String node : nodes) {
+      if (StringUtils.isBlank(node) || !node.contains(IP_PORT_SEPARATOR)) {
         continue;
       }
-      String ipAddr = slowNode.split(IP_PORT_SEPARATOR)[0];
+      String ipAddr = node.split(IP_PORT_SEPARATOR)[0];
       DatanodeDescriptor datanodeByHost =
           host2DatanodeMap.getDatanodeByHost(ipAddr);
       if (datanodeByHost != null) {
-        slowPeersUuidSet.add(datanodeByHost.getDatanodeUuid());
+        datanodeDescriptors.add(datanodeByHost);
       }
     }
-    return slowPeersUuidSet;
+    return datanodeDescriptors;
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
index f62a7b504a1..2e456b67ca1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
@@ -54,6 +54,8 @@ public class DataNodePeerMetrics {
 
   private final String name;
 
+  // Strictly to be used by test code only. Source code is not supposed to use this.
+  private Map<String, Double> testOutlier = null;
 
   private final OutlierDetector slowNodeDetector;
 
@@ -142,14 +144,28 @@ public class DataNodePeerMetrics {
    * than their peers.
    */
   public Map<String, Double> getOutliers() {
-    // This maps the metric name to the aggregate latency.
-    // The metric name is the datanode ID.
-    final Map<String, Double> stats =
-        sendPacketDownstreamRollingAverages.getStats(
-            minOutlierDetectionSamples);
-    LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
-
-    return slowNodeDetector.getOutliers(stats);
+    // outlier must be null for source code.
+    if (testOutlier == null) {
+      // This maps the metric name to the aggregate latency.
+      // The metric name is the datanode ID.
+      final Map<String, Double> stats =
+          sendPacketDownstreamRollingAverages.getStats(minOutlierDetectionSamples);
+      LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
+      return slowNodeDetector.getOutliers(stats);
+    } else {
+      // this happens only for test code.
+      return testOutlier;
+    }
+  }
+
+  /**
+   * Strictly to be used by test code only. Source code is not supposed to use this. This method
+   * directly sets outlier mapping so that aggregate latency metrics are not calculated for tests.
+   *
+   * @param outlier outlier directly set by tests.
+   */
+  public void setTestOutliers(Map<String, Double> outlier) {
+    this.testOutlier = outlier;
   }
 
   public MutableRollingAverages getSendPacketDownstreamRollingAverages() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 3ce8a80be65..6ff8fc91b48 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -4811,6 +4811,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
+  DatanodeInfo[] slowDataNodesReport() throws IOException {
+    String operationName = "slowDataNodesReport";
+    DatanodeInfo[] datanodeInfos;
+    checkOperation(OperationCategory.UNCHECKED);
+    readLock();
+    try {
+      checkOperation(OperationCategory.UNCHECKED);
+      final DatanodeManager dm = getBlockManager().getDatanodeManager();
+      final List<DatanodeDescriptor> results = dm.getAllSlowDataNodes();
+      datanodeInfos = getDatanodeInfoFromDescriptors(results);
+    } finally {
+      readUnlock(operationName);
+    }
+    logAuditEvent(true, operationName, null);
+    return datanodeInfos;
+  }
+
+  private DatanodeInfo[] getDatanodeInfoFromDescriptors(List<DatanodeDescriptor> results) {
+    DatanodeInfo[] datanodeInfos = new DatanodeInfo[results.size()];
+    for (int i = 0; i < datanodeInfos.length; i++) {
+      datanodeInfos[i] = new DatanodeInfoBuilder().setFrom(results.get(i)).build();
+      datanodeInfos[i].setNumBlocks(results.get(i).numBlocks());
+    }
+    return datanodeInfos;
+  }
+
   DatanodeInfo[] datanodeReport(final DatanodeReportType type)
       throws IOException {
     String operationName = "datanodeReport";
@@ -4822,12 +4848,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.UNCHECKED);
       final DatanodeManager dm = getBlockManager().getDatanodeManager();      
       final List<DatanodeDescriptor> results = dm.getDatanodeListForReport(type);
-      arr = new DatanodeInfo[results.size()];
-      for (int i=0; i<arr.length; i++) {
-        arr[i] = new DatanodeInfoBuilder().setFrom(results.get(i))
-            .build();
-        arr[i].setNumBlocks(results.get(i).numBlocks());
-      }
+      arr = getDatanodeInfoFromDescriptors(results);
     } finally {
       readUnlock(operationName);
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index e6610e0e505..f1014a90fc5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1499,6 +1499,12 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     }
   }
 
+  @Override
+  public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
+    checkNNStartup();
+    return namesystem.slowDataNodesReport();
+  }
+
   @Override // ClientProtocol
   public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
                        StorageType type)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index b57637df731..38d6c1c3710 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -429,7 +429,7 @@ public class DFSAdmin extends FsShell {
    */
   private static final String commonUsageSummary =
     "\t[-report [-live] [-dead] [-decommissioning] " +
-    "[-enteringmaintenance] [-inmaintenance]]\n" +
+      "[-enteringmaintenance] [-inmaintenance] [-slownodes]]\n" +
     "\t[-safemode <enter | leave | get | wait | forceExit>]\n" +
     "\t[-saveNamespace [-beforeShutdown]]\n" +
     "\t[-rollEdits]\n" +
@@ -582,11 +582,13 @@ public class DFSAdmin extends FsShell {
         StringUtils.popOption("-enteringmaintenance", args);
     final boolean listInMaintenance =
         StringUtils.popOption("-inmaintenance", args);
+    final boolean listSlowNodes =
+        StringUtils.popOption("-slownodes", args);
 
 
     // If no filter flags are found, then list all DN types
     boolean listAll = (!listLive && !listDead && !listDecommissioning
-        && !listEnteringMaintenance && !listInMaintenance);
+        && !listEnteringMaintenance && !listInMaintenance && !listSlowNodes);
 
     if (listAll || listLive) {
       printDataNodeReports(dfs, DatanodeReportType.LIVE, listLive, "Live");
@@ -610,6 +612,10 @@ public class DFSAdmin extends FsShell {
       printDataNodeReports(dfs, DatanodeReportType.IN_MAINTENANCE,
           listInMaintenance, "In maintenance");
     }
+
+    if (listAll || listSlowNodes) {
+      printSlowDataNodeReports(dfs, listSlowNodes, "Slow");
+    }
   }
 
   private static void printDataNodeReports(DistributedFileSystem dfs,
@@ -627,6 +633,20 @@ public class DFSAdmin extends FsShell {
     }
   }
 
+  private static void printSlowDataNodeReports(DistributedFileSystem dfs, boolean listNodes,
+      String nodeState) throws IOException {
+    DatanodeInfo[] nodes = dfs.getSlowDatanodeStats();
+    if (nodes.length > 0 || listNodes) {
+      System.out.println(nodeState + " datanodes (" + nodes.length + "):\n");
+    }
+    if (nodes.length > 0) {
+      for (DatanodeInfo dn : nodes) {
+        System.out.println(dn.getDatanodeReport());
+        System.out.println();
+      }
+    }
+  }
+
   /**
    * Safe mode maintenance command.
    * Usage: hdfs dfsadmin -safemode [enter | leave | get | wait | forceExit]
@@ -1103,7 +1123,7 @@ public class DFSAdmin extends FsShell {
       commonUsageSummary;
 
     String report ="-report [-live] [-dead] [-decommissioning] "
-        + "[-enteringmaintenance] [-inmaintenance]:\n" +
+        + "[-enteringmaintenance] [-inmaintenance] [-slownodes]:\n" +
         "\tReports basic filesystem information and statistics. \n" +
         "\tThe dfs usage can be different from \"du\" usage, because it\n" +
         "\tmeasures raw space used by replication, checksums, snapshots\n" +
@@ -2073,7 +2093,7 @@ public class DFSAdmin extends FsShell {
     if ("-report".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
           + " [-report] [-live] [-dead] [-decommissioning]"
-          + " [-enteringmaintenance] [-inmaintenance]");
+          + " [-enteringmaintenance] [-inmaintenance] [-slownodes]");
     } else if ("-safemode".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
           + " [-safemode enter | leave | get | wait | forceExit]");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index 3d53207c0ff..b21a1f14e27 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -345,7 +345,7 @@ Runs a HDFS datanode.
 
 Usage:
 
-        hdfs dfsadmin [-report [-live] [-dead] [-decommissioning] [-enteringmaintenance] [-inmaintenance]]
+        hdfs dfsadmin [-report [-live] [-dead] [-decommissioning] [-enteringmaintenance] [-inmaintenance] [-slownodes]]
         hdfs dfsadmin [-safemode enter | leave | get | wait | forceExit]
         hdfs dfsadmin [-saveNamespace [-beforeShutdown]]
         hdfs dfsadmin [-rollEdits]
@@ -383,7 +383,7 @@ Usage:
 
 | COMMAND\_OPTION | Description |
 |:---- |:---- |
-| `-report` `[-live]` `[-dead]` `[-decommissioning]` `[-enteringmaintenance]` `[-inmaintenance]` | Reports basic filesystem information and statistics, The dfs usage can be different from "du" usage, because it measures raw space used by replication, checksums, snapshots and etc. on all the DNs. Optional flags may be used to filter the list of displayed DataNodes. |
+| `-report` `[-live]` `[-dead]` `[-decommissioning]` `[-enteringmaintenance]` `[-inmaintenance]` `[-slownodes]` | Reports basic filesystem information and statistics, The dfs usage can be different from "du" usage, because it measures raw space used by replication, checksums, snapshots and etc. on all the DNs. Optional flags may be used to filter the list of displayed DataNodes. Filters are either based on the DN state (e.g. live, dead, decommissioning) or the nature of the DN (e.g. slow [...]
 | `-safemode` enter\|leave\|get\|wait\|forceExit | Safe mode maintenance command. Safe mode is a Namenode state in which it <br/>1. does not accept changes to the name space (read-only) <br/>2. does not replicate or delete blocks. <br/>Safe mode is entered automatically at Namenode startup, and leaves safe mode automatically when the configured minimum percentage of blocks satisfies the minimum replication condition. If Namenode detects any anomaly then it will linger in safe mode till t [...]
 | `-saveNamespace` `[-beforeShutdown]` | Save current namespace into storage directories and reset edits log. Requires safe mode. If the "beforeShutdown" option is given, the NameNode does a checkpoint if and only if no checkpoint has been done during a time window (a configurable number of checkpoint periods). This is usually used before shutting down the NameNode to prevent potential fsimage/editlog corruption. |
 | `-rollEdits` | Rolls the edit log on the active NameNode. |
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java
new file mode 100644
index 00000000000..583c3159d5c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys
+    .DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
+
+/**
+ * Tests to report slow running datanodes.
+ */
+public class TestSlowDatanodeReport {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestSlowDatanodeReport.class);
+
+  private MiniDFSCluster cluster;
+
+  @Before
+  public void testSetup() throws Exception {
+    Configuration conf = new Configuration();
+
+    conf.set(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "1000");
+    conf.set(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
+    conf.set(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY, "1");
+    conf.set(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY, "1");
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testSingleNodeReport() throws Exception {
+    List<DataNode> dataNodes = cluster.getDataNodes();
+    DataNode slowNode = dataNodes.get(1);
+    dataNodes.get(0).getPeerMetrics().setTestOutliers(
+        ImmutableMap.of(slowNode.getDatanodeHostname() + ":" + slowNode.getIpcPort(), 15.5));
+    DistributedFileSystem distributedFileSystem = cluster.getFileSystem();
+    Assert.assertEquals(3, distributedFileSystem.getDataNodeStats().length);
+    GenericTestUtils.waitFor(() -> {
+      try {
+        DatanodeInfo[] slowNodeInfo = distributedFileSystem.getSlowDatanodeStats();
+        LOG.info("Slow Datanode report: {}", Arrays.asList(slowNodeInfo));
+        return slowNodeInfo.length == 1;
+      } catch (IOException e) {
+        LOG.error("Failed to retrieve slownode report", e);
+        return false;
+      }
+    }, 2000, 180000, "Slow nodes could not be detected");
+  }
+
+  @Test
+  public void testMultiNodesReport() throws Exception {
+    List<DataNode> dataNodes = cluster.getDataNodes();
+    dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of(
+        dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 15.5));
+    dataNodes.get(1).getPeerMetrics().setTestOutliers(ImmutableMap.of(
+        dataNodes.get(2).getDatanodeHostname() + ":" + dataNodes.get(2).getIpcPort(), 18.7));
+    DistributedFileSystem distributedFileSystem = cluster.getFileSystem();
+    Assert.assertEquals(3, distributedFileSystem.getDataNodeStats().length);
+    GenericTestUtils.waitFor(() -> {
+      try {
+        DatanodeInfo[] slowNodeInfo = distributedFileSystem.getSlowDatanodeStats();
+        LOG.info("Slow Datanode report: {}", Arrays.asList(slowNodeInfo));
+        return slowNodeInfo.length == 2;
+      } catch (IOException e) {
+        LOG.error("Failed to retrieve slownode report", e);
+        return false;
+      }
+    }, 2000, 200000, "Slow nodes could not be detected");
+  }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml
index 6142a1a566a..6a897aa6092 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml
@@ -15788,7 +15788,7 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^-report \[-live\] \[-dead\] \[-decommissioning\] \[-enteringmaintenance\] \[-inmaintenance\]:(.)*</expected-output>
+          <expected-output>^-report \[-live\] \[-dead\] \[-decommissioning\] \[-enteringmaintenance\] \[-inmaintenance\] \[-slownodes\]:(.)*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org