You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2022/05/21 02:11:38 UTC

[hadoop] branch branch-3.3 updated: HDFS-16582. Expose aggregate latency of slow node as perceived by the reporting node (#4323)

This is an automated email from the ASF dual-hosted git repository.

tomscut pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new ab3a9cedc9c HDFS-16582. Expose aggregate latency of slow node as perceived by the reporting node (#4323)
ab3a9cedc9c is described below

commit ab3a9cedc9cffe1363886a871efa947078330eff
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Fri May 20 18:28:59 2022 -0700

    HDFS-16582. Expose aggregate latency of slow node as perceived by the reporting node (#4323)
    
    Reviewed-by: Wei-Chiu Chuang <we...@apache.org>
    Signed-off-by: Tao Li <to...@apache.org>
---
 .../server/blockmanagement/DatanodeManager.java    |   5 +-
 .../blockmanagement/SlowPeerDisabledTracker.java   |   6 +-
 .../server/blockmanagement/SlowPeerJsonReport.java |  84 +++++++++++++
 .../SlowPeerLatencyWithReportingNode.java          |  88 ++++++++++++++
 .../server/blockmanagement/SlowPeerTracker.java    | 135 ++++++++++-----------
 .../apache/hadoop/hdfs/TestSlowDatanodeReport.java |  15 ++-
 .../TestReplicationPolicyExcludeSlowNodes.java     |  12 +-
 .../blockmanagement/TestSlowPeerTracker.java       | 104 ++++++++++------
 8 files changed, 324 insertions(+), 125 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 9fe9ace081d..ddb17d42cdd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1886,8 +1886,9 @@ public class DatanodeManager {
         if (LOG.isDebugEnabled()) {
           LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap);
         }
-        for (String slowNodeId : slowPeersMap.keySet()) {
-          slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
+        for (Map.Entry<String, Double> slowNodeId : slowPeersMap.entrySet()) {
+          slowPeerTracker.addReport(slowNodeId.getKey(), nodeReg.getIpcAddr(false),
+              slowNodeId.getValue());
         }
       }
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerDisabledTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerDisabledTracker.java
index 2f6be2f5a7b..567984204a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerDisabledTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerDisabledTracker.java
@@ -58,20 +58,20 @@ public class SlowPeerDisabledTracker extends SlowPeerTracker {
   }
 
   @Override
-  public void addReport(String slowNode, String reportingNode) {
+  public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) {
     LOG.trace("Adding slow peer report is disabled. To enable it, please enable config {}.",
         DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
   }
 
   @Override
-  public Set<String> getReportsForNode(String slowNode) {
+  public Set<SlowPeerLatencyWithReportingNode> getReportsForNode(String slowNode) {
     LOG.trace("Retrieval of slow peer report is disabled. To enable it, please enable config {}.",
         DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
     return ImmutableSet.of();
   }
 
   @Override
-  public Map<String, SortedSet<String>> getReportsForAllDataNodes() {
+  public Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> getReportsForAllDataNodes() {
     LOG.trace("Retrieval of slow peer report for all nodes is disabled. "
             + "To enable it, please enable config {}.",
         DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerJsonReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerJsonReport.java
new file mode 100644
index 00000000000..b9b741e9d61
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerJsonReport.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.SortedSet;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * This structure is a thin wrapper over slow peer reports to make Json
+ * [de]serialization easy.
+ */
+@InterfaceAudience.Private
+final class SlowPeerJsonReport {
+
+  @JsonProperty("SlowNode")
+  private final String slowNode;
+
+  @JsonProperty("SlowPeerLatencyWithReportingNodes")
+  private final SortedSet<SlowPeerLatencyWithReportingNode> slowPeerLatencyWithReportingNodes;
+
+  SlowPeerJsonReport(
+      @JsonProperty("SlowNode")
+          String slowNode,
+      @JsonProperty("SlowPeerLatencyWithReportingNodes")
+          SortedSet<SlowPeerLatencyWithReportingNode> slowPeerLatencyWithReportingNodes) {
+    this.slowNode = slowNode;
+    this.slowPeerLatencyWithReportingNodes = slowPeerLatencyWithReportingNodes;
+  }
+
+  public String getSlowNode() {
+    return slowNode;
+  }
+
+  public SortedSet<SlowPeerLatencyWithReportingNode> getSlowPeerLatencyWithReportingNodes() {
+    return slowPeerLatencyWithReportingNodes;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SlowPeerJsonReport that = (SlowPeerJsonReport) o;
+
+    return new EqualsBuilder()
+        .append(slowNode, that.slowNode)
+        .append(slowPeerLatencyWithReportingNodes, that.slowPeerLatencyWithReportingNodes)
+        .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37)
+        .append(slowNode)
+        .append(slowPeerLatencyWithReportingNodes)
+        .toHashCode();
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerLatencyWithReportingNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerLatencyWithReportingNode.java
new file mode 100644
index 00000000000..a3f90062600
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerLatencyWithReportingNode.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * This class represents the reporting node and the slow node's latency as observed by the
+ * reporting node. This class is used by SlowPeerJsonReport class.
+ */
+@InterfaceAudience.Private
+final class SlowPeerLatencyWithReportingNode
+    implements Comparable<SlowPeerLatencyWithReportingNode> {
+
+  @JsonProperty("ReportingNode")
+  private final String reportingNode;
+
+  @JsonProperty("ReportedLatency")
+  private final Double reportedLatency;
+
+  SlowPeerLatencyWithReportingNode(
+      @JsonProperty("ReportingNode")
+          String reportingNode,
+      @JsonProperty("ReportedLatency")
+          Double reportedLatency) {
+    this.reportingNode = reportingNode;
+    this.reportedLatency = reportedLatency;
+  }
+
+  public String getReportingNode() {
+    return reportingNode;
+  }
+
+  public Double getReportedLatency() {
+    return reportedLatency;
+  }
+
+  @Override
+  public int compareTo(SlowPeerLatencyWithReportingNode o) {
+    return this.reportingNode.compareTo(o.getReportingNode());
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SlowPeerLatencyWithReportingNode that = (SlowPeerLatencyWithReportingNode) o;
+
+    return new EqualsBuilder()
+        .append(reportingNode, that.reportingNode)
+        .append(reportedLatency, that.reportedLatency)
+        .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37)
+        .append(reportingNode)
+        .append(reportedLatency)
+        .toHashCode();
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
index 9998efe7f20..e8a7c825d1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
@@ -37,7 +36,6 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -95,7 +93,7 @@ public class SlowPeerTracker {
    * Stale reports are not evicted proactively and can potentially
    * hang around forever.
    */
-  private final ConcurrentMap<String, ConcurrentMap<String, Long>>
+  private final ConcurrentMap<String, ConcurrentMap<String, LatencyWithLastReportTime>>
       allReports;
 
   public SlowPeerTracker(Configuration conf, Timer timer) {
@@ -123,12 +121,12 @@ public class SlowPeerTracker {
    * Add a new report. DatanodeIds can be the DataNodeIds or addresses
    * We don't care as long as the caller is consistent.
    *
-   * @param reportingNode DataNodeId of the node reporting on its peer.
    * @param slowNode DataNodeId of the peer suspected to be slow.
+   * @param reportingNode DataNodeId of the node reporting on its peer.
+   * @param slowNodeLatency Aggregate latency of slownode as reported by the reporting node.
    */
-  public void addReport(String slowNode,
-                        String reportingNode) {
-    ConcurrentMap<String, Long> nodeEntries = allReports.get(slowNode);
+  public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) {
+    ConcurrentMap<String, LatencyWithLastReportTime> nodeEntries = allReports.get(slowNode);
 
     if (nodeEntries == null) {
       // putIfAbsent guards against multiple writers.
@@ -137,7 +135,8 @@ public class SlowPeerTracker {
     }
 
     // Replace the existing entry from this node, if any.
-    nodeEntries.put(reportingNode, timer.monotonicNow());
+    nodeEntries.put(reportingNode,
+        new LatencyWithLastReportTime(timer.monotonicNow(), slowNodeLatency));
   }
 
   /**
@@ -147,8 +146,8 @@ public class SlowPeerTracker {
    * @param slowNode target node Id.
    * @return set of reports which implicate the target node as being slow.
    */
-  public Set<String> getReportsForNode(String slowNode) {
-    final ConcurrentMap<String, Long> nodeEntries =
+  public Set<SlowPeerLatencyWithReportingNode> getReportsForNode(String slowNode) {
+    final ConcurrentMap<String, LatencyWithLastReportTime> nodeEntries =
         allReports.get(slowNode);
 
     if (nodeEntries == null || nodeEntries.isEmpty()) {
@@ -163,17 +162,19 @@ public class SlowPeerTracker {
    *
    * @return map from SlowNodeId {@literal ->} (set of nodes reporting peers).
    */
-  public Map<String, SortedSet<String>> getReportsForAllDataNodes() {
+  public Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> getReportsForAllDataNodes() {
     if (allReports.isEmpty()) {
       return ImmutableMap.of();
     }
 
-    final Map<String, SortedSet<String>> allNodesValidReports = new HashMap<>();
+    final Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> allNodesValidReports =
+        new HashMap<>();
     final long now = timer.monotonicNow();
 
-    for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
-        allReports.entrySet()) {
-      SortedSet<String> validReports = filterNodeReports(entry.getValue(), now);
+    for (Map.Entry<String, ConcurrentMap<String, LatencyWithLastReportTime>> entry
+        : allReports.entrySet()) {
+      SortedSet<SlowPeerLatencyWithReportingNode> validReports =
+          filterNodeReports(entry.getValue(), now);
       if (!validReports.isEmpty()) {
         allNodesValidReports.put(entry.getKey(), validReports);
       }
@@ -184,17 +185,18 @@ public class SlowPeerTracker {
   /**
    * Filter the given reports to return just the valid ones.
    *
-   * @param reports
-   * @param now
-   * @return
+   * @param reports Current set of reports.
+   * @param now Current time.
+   * @return Set of valid reports that were created within last reportValidityMs millis.
    */
-  private SortedSet<String> filterNodeReports(
-      ConcurrentMap<String, Long> reports, long now) {
-    final SortedSet<String> validReports = new TreeSet<>();
-
-    for (Map.Entry<String, Long> entry : reports.entrySet()) {
-      if (now - entry.getValue() < reportValidityMs) {
-        validReports.add(entry.getKey());
+  private SortedSet<SlowPeerLatencyWithReportingNode> filterNodeReports(
+      ConcurrentMap<String, LatencyWithLastReportTime> reports, long now) {
+    final SortedSet<SlowPeerLatencyWithReportingNode> validReports = new TreeSet<>();
+
+    for (Map.Entry<String, LatencyWithLastReportTime> entry : reports.entrySet()) {
+      if (now - entry.getValue().getTime() < reportValidityMs) {
+        validReports.add(
+            new SlowPeerLatencyWithReportingNode(entry.getKey(), entry.getValue().getLatency()));
       }
     }
     return validReports;
@@ -206,7 +208,7 @@ public class SlowPeerTracker {
    *         serialization failed.
    */
   public String getJson() {
-    Collection<ReportForJson> validReports = getJsonReports(
+    Collection<SlowPeerJsonReport> validReports = getJsonReports(
         maxNodesToReport);
     try {
       return WRITER.writeValueAsString(validReports);
@@ -217,42 +219,15 @@ public class SlowPeerTracker {
     }
   }
 
-  /**
-   * This structure is a thin wrapper over reports to make Json
-   * [de]serialization easy.
-   */
-  public static class ReportForJson {
-    @JsonProperty("SlowNode")
-    final private String slowNode;
-
-    @JsonProperty("ReportingNodes")
-    final private SortedSet<String> reportingNodes;
-
-    public ReportForJson(
-        @JsonProperty("SlowNode") String slowNode,
-        @JsonProperty("ReportingNodes") SortedSet<String> reportingNodes) {
-      this.slowNode = slowNode;
-      this.reportingNodes = reportingNodes;
-    }
-
-    public String getSlowNode() {
-      return slowNode;
-    }
-
-    public SortedSet<String> getReportingNodes() {
-      return reportingNodes;
-    }
-  }
-
   /**
    * Returns all tracking slow peers.
    * @param numNodes
    * @return
    */
   public List<String> getSlowNodes(int numNodes) {
-    Collection<ReportForJson> jsonReports = getJsonReports(numNodes);
+    Collection<SlowPeerJsonReport> jsonReports = getJsonReports(numNodes);
     ArrayList<String> slowNodes = new ArrayList<>();
-    for (ReportForJson jsonReport : jsonReports) {
+    for (SlowPeerJsonReport jsonReport : jsonReports) {
       slowNodes.add(jsonReport.getSlowNode());
     }
     if (!slowNodes.isEmpty()) {
@@ -267,35 +242,30 @@ public class SlowPeerTracker {
    * @param numNodes number of nodes to return. This is to limit the
    *                 size of the generated JSON.
    */
-  private Collection<ReportForJson> getJsonReports(int numNodes) {
+  private Collection<SlowPeerJsonReport> getJsonReports(int numNodes) {
     if (allReports.isEmpty()) {
       return Collections.emptyList();
     }
 
-    final PriorityQueue<ReportForJson> topNReports =
-        new PriorityQueue<>(allReports.size(),
-            new Comparator<ReportForJson>() {
-          @Override
-          public int compare(ReportForJson o1, ReportForJson o2) {
-            return Ints.compare(o1.reportingNodes.size(),
-                o2.reportingNodes.size());
-          }
-        });
+    final PriorityQueue<SlowPeerJsonReport> topNReports = new PriorityQueue<>(allReports.size(),
+        (o1, o2) -> Ints.compare(o1.getSlowPeerLatencyWithReportingNodes().size(),
+            o2.getSlowPeerLatencyWithReportingNodes().size()));
 
     final long now = timer.monotonicNow();
 
-    for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
-        allReports.entrySet()) {
-      SortedSet<String> validReports = filterNodeReports(
-          entry.getValue(), now);
+    for (Map.Entry<String, ConcurrentMap<String, LatencyWithLastReportTime>> entry
+        : allReports.entrySet()) {
+      SortedSet<SlowPeerLatencyWithReportingNode> validReports =
+          filterNodeReports(entry.getValue(), now);
       if (!validReports.isEmpty()) {
         if (topNReports.size() < numNodes) {
-          topNReports.add(new ReportForJson(entry.getKey(), validReports));
-        } else if (topNReports.peek().getReportingNodes().size() <
-            validReports.size()){
+          topNReports.add(new SlowPeerJsonReport(entry.getKey(), validReports));
+        } else if (topNReports.peek() != null
+            && topNReports.peek().getSlowPeerLatencyWithReportingNodes().size()
+            < validReports.size()) {
           // Remove the lowest element
           topNReports.poll();
-          topNReports.add(new ReportForJson(entry.getKey(), validReports));
+          topNReports.add(new SlowPeerJsonReport(entry.getKey(), validReports));
         }
       }
     }
@@ -306,4 +276,23 @@ public class SlowPeerTracker {
   long getReportValidityMs() {
     return reportValidityMs;
   }
+
+  private static class LatencyWithLastReportTime {
+    private final Long time;
+    private final Double latency;
+
+    LatencyWithLastReportTime(Long time, Double latency) {
+      this.time = time;
+      this.latency = latency;
+    }
+
+    public Long getTime() {
+      return time;
+    }
+
+    public Double getLatency() {
+      return latency;
+    }
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java
index 583c3159d5c..d6c728c68ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java
@@ -86,13 +86,18 @@ public class TestSlowDatanodeReport {
         return false;
       }
     }, 2000, 180000, "Slow nodes could not be detected");
+    LOG.info("Slow peer report: {}", cluster.getNameNode().getSlowPeersReport());
+    Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().length() > 0);
+    Assert.assertTrue(
+        cluster.getNameNode().getSlowPeersReport().contains(slowNode.getDatanodeHostname()));
+    Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("15.5"));
   }
 
   @Test
   public void testMultiNodesReport() throws Exception {
     List<DataNode> dataNodes = cluster.getDataNodes();
     dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of(
-        dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 15.5));
+        dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 14.5));
     dataNodes.get(1).getPeerMetrics().setTestOutliers(ImmutableMap.of(
         dataNodes.get(2).getDatanodeHostname() + ":" + dataNodes.get(2).getIpcPort(), 18.7));
     DistributedFileSystem distributedFileSystem = cluster.getFileSystem();
@@ -107,6 +112,14 @@ public class TestSlowDatanodeReport {
         return false;
       }
     }, 2000, 200000, "Slow nodes could not be detected");
+    LOG.info("Slow peer report: {}", cluster.getNameNode().getSlowPeersReport());
+    Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().length() > 0);
+    Assert.assertTrue(cluster.getNameNode().getSlowPeersReport()
+        .contains(dataNodes.get(1).getDatanodeHostname()));
+    Assert.assertTrue(cluster.getNameNode().getSlowPeersReport()
+        .contains(dataNodes.get(2).getDatanodeHostname()));
+    Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("14.5"));
+    Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("18.7"));
   }
 
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java
index 3f26ffc1b44..cfc0c9fa977 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java
@@ -87,12 +87,12 @@ public class TestReplicationPolicyExcludeSlowNodes
 
       // mock slow nodes
       SlowPeerTracker tracker = dnManager.getSlowPeerTracker();
-      tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr());
-      tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr());
-      tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr());
-      tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr());
-      tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr());
-      tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr());
+      tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr(), 1.29463);
+      tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr(), 2.9576);
+      tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr(), 3.59674);
+      tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr(), 4.238456);
+      tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr(), 5.18375);
+      tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr(), 6.39576);
 
       // waiting for slow nodes collector run
       Thread.sleep(3000);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
index fb2928cc486..67a212f21ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
@@ -23,8 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker
-    .ReportForJson;
 import org.apache.hadoop.util.FakeTimer;
 import org.junit.Before;
 import org.junit.Rule;
@@ -37,6 +35,7 @@ import java.io.IOException;
 import java.util.Set;
 
 import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -59,7 +58,7 @@ public class TestSlowPeerTracker {
   private FakeTimer timer;
   private long reportValidityMs;
   private static final ObjectReader READER =
-      new ObjectMapper().readerFor(new TypeReference<Set<ReportForJson>>() {});
+      new ObjectMapper().readerFor(new TypeReference<Set<SlowPeerJsonReport>>() {});
 
   @Before
   public void setup() {
@@ -80,9 +79,9 @@ public class TestSlowPeerTracker {
 
   @Test
   public void testReportsAreRetrieved() {
-    tracker.addReport("node2", "node1");
-    tracker.addReport("node3", "node1");
-    tracker.addReport("node3", "node2");
+    tracker.addReport("node2", "node1", 1.2);
+    tracker.addReport("node3", "node1", 2.1);
+    tracker.addReport("node3", "node2", 1.22);
 
     assertThat(tracker.getReportsForAllDataNodes().size(), is(2));
     assertThat(tracker.getReportsForNode("node2").size(), is(1));
@@ -95,9 +94,9 @@ public class TestSlowPeerTracker {
    */
   @Test
   public void testAllReportsAreExpired() {
-    tracker.addReport("node2", "node1");
-    tracker.addReport("node3", "node2");
-    tracker.addReport("node1", "node3");
+    tracker.addReport("node2", "node1", 0.123);
+    tracker.addReport("node3", "node2", 0.2334);
+    tracker.addReport("node1", "node3", 1.234);
 
     // No reports should expire after 1ms.
     timer.advance(1);
@@ -117,13 +116,14 @@ public class TestSlowPeerTracker {
    */
   @Test
   public void testSomeReportsAreExpired() {
-    tracker.addReport("node3", "node1");
-    tracker.addReport("node3", "node2");
+    tracker.addReport("node3", "node1", 1.234);
+    tracker.addReport("node3", "node2", 1.222);
     timer.advance(reportValidityMs);
-    tracker.addReport("node3", "node4");
+    tracker.addReport("node3", "node4", 1.20);
     assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
     assertThat(tracker.getReportsForNode("node3").size(), is(1));
-    assertTrue(tracker.getReportsForNode("node3").contains("node4"));
+    assertEquals(1, tracker.getReportsForNode("node3").stream()
+        .filter(e -> e.getReportingNode().equals("node4")).count());
   }
 
   /**
@@ -131,24 +131,24 @@ public class TestSlowPeerTracker {
    */
   @Test
   public void testReplacement() {
-    tracker.addReport("node2", "node1");
+    tracker.addReport("node2", "node1", 2.1);
     timer.advance(reportValidityMs); // Expire the report.
     assertThat(tracker.getReportsForAllDataNodes().size(), is(0));
 
     // This should replace the expired report with a newer valid one.
-    tracker.addReport("node2", "node1");
+    tracker.addReport("node2", "node1", 0.001);
     assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
     assertThat(tracker.getReportsForNode("node2").size(), is(1));
   }
 
   @Test
   public void testGetJson() throws IOException {
-    tracker.addReport("node1", "node2");
-    tracker.addReport("node2", "node3");
-    tracker.addReport("node2", "node1");
-    tracker.addReport("node4", "node1");
+    tracker.addReport("node1", "node2", 1.1);
+    tracker.addReport("node2", "node3", 1.23);
+    tracker.addReport("node2", "node1", 2.13);
+    tracker.addReport("node4", "node1", 1.244);
 
-    final Set<ReportForJson> reports = getAndDeserializeJson();
+    final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
 
     // And ensure its contents are what we expect.
     assertThat(reports.size(), is(3));
@@ -161,19 +161,19 @@ public class TestSlowPeerTracker {
 
   @Test
   public void testGetJsonSizeIsLimited() throws IOException {
-    tracker.addReport("node1", "node2");
-    tracker.addReport("node1", "node3");
-    tracker.addReport("node2", "node3");
-    tracker.addReport("node2", "node4");
-    tracker.addReport("node3", "node4");
-    tracker.addReport("node3", "node5");
-    tracker.addReport("node4", "node6");
-    tracker.addReport("node5", "node6");
-    tracker.addReport("node5", "node7");
-    tracker.addReport("node6", "node7");
-    tracker.addReport("node6", "node8");
-
-    final Set<ReportForJson> reports = getAndDeserializeJson();
+    tracker.addReport("node1", "node2", 1.634);
+    tracker.addReport("node1", "node3", 2.3566);
+    tracker.addReport("node2", "node3", 3.869);
+    tracker.addReport("node2", "node4", 4.1356);
+    tracker.addReport("node3", "node4", 1.73057);
+    tracker.addReport("node3", "node5", 2.4956730);
+    tracker.addReport("node4", "node6", 3.29847);
+    tracker.addReport("node5", "node6", 4.13444);
+    tracker.addReport("node5", "node7", 5.10845);
+    tracker.addReport("node6", "node8", 2.37464);
+    tracker.addReport("node6", "node7", 1.29475656);
+
+    final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
 
     // Ensure that node4 is not in the list since it was
     // tagged by just one peer and we already have 5 other nodes.
@@ -185,22 +185,46 @@ public class TestSlowPeerTracker {
     assertTrue(isNodeInReports(reports, "node3"));
     assertTrue(isNodeInReports(reports, "node5"));
     assertTrue(isNodeInReports(reports, "node6"));
+
+    assertEquals(1, reports.stream().filter(
+        e -> e.getSlowNode().equals("node1") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
+            && e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(1.634)
+            && e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency().equals(2.3566))
+        .count());
+
+    assertEquals(1, reports.stream().filter(
+        e -> e.getSlowNode().equals("node2") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
+            && e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(3.869)
+            && e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency().equals(4.1356))
+        .count());
+
+    assertEquals(1, reports.stream().filter(
+        e -> e.getSlowNode().equals("node3") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
+            && e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(1.73057)
+            && e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency()
+            .equals(2.4956730)).count());
+
+    assertEquals(1, reports.stream().filter(
+        e -> e.getSlowNode().equals("node6") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
+            && e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency()
+            .equals(1.29475656) && e.getSlowPeerLatencyWithReportingNodes().last()
+            .getReportedLatency().equals(2.37464)).count());
   }
 
   @Test
   public void testLowRankedElementsIgnored() throws IOException {
     // Insert 5 nodes with 2 peer reports each.
     for (int i = 0; i < 5; ++i) {
-      tracker.addReport("node" + i, "reporter1");
-      tracker.addReport("node" + i, "reporter2");
+      tracker.addReport("node" + i, "reporter1", 1.295673);
+      tracker.addReport("node" + i, "reporter2", 2.38560);
     }
 
     // Insert 10 nodes with 1 peer report each.
     for (int i = 10; i < 20; ++i) {
-      tracker.addReport("node" + i, "reporter1");
+      tracker.addReport("node" + i, "reporter1", 3.4957);
     }
 
-    final Set<ReportForJson> reports = getAndDeserializeJson();
+    final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
 
     // Ensure that only the first 5 nodes with two reports each were
     // included in the JSON.
@@ -210,8 +234,8 @@ public class TestSlowPeerTracker {
   }
 
   private boolean isNodeInReports(
-      Set<ReportForJson> reports, String node) {
-    for (ReportForJson report : reports) {
+      Set<SlowPeerJsonReport> reports, String node) {
+    for (SlowPeerJsonReport report : reports) {
       if (report.getSlowNode().equalsIgnoreCase(node)) {
         return true;
       }
@@ -219,7 +243,7 @@ public class TestSlowPeerTracker {
     return false;
   }
 
-  private Set<ReportForJson> getAndDeserializeJson()
+  private Set<SlowPeerJsonReport> getAndDeserializeJson()
       throws IOException {
     final String json = tracker.getJson();
     LOG.info("Got JSON: {}", json);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org