You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2022/05/21 02:11:38 UTC
[hadoop] branch branch-3.3 updated: HDFS-16582. Expose aggregate latency of slow node as perceived by the reporting node (#4323)
This is an automated email from the ASF dual-hosted git repository.
tomscut pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new ab3a9cedc9c HDFS-16582. Expose aggregate latency of slow node as perceived by the reporting node (#4323)
ab3a9cedc9c is described below
commit ab3a9cedc9cffe1363886a871efa947078330eff
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Fri May 20 18:28:59 2022 -0700
HDFS-16582. Expose aggregate latency of slow node as perceived by the reporting node (#4323)
Reviewed-by: Wei-Chiu Chuang <we...@apache.org>
Signed-off-by: Tao Li <to...@apache.org>
---
.../server/blockmanagement/DatanodeManager.java | 5 +-
.../blockmanagement/SlowPeerDisabledTracker.java | 6 +-
.../server/blockmanagement/SlowPeerJsonReport.java | 84 +++++++++++++
.../SlowPeerLatencyWithReportingNode.java | 88 ++++++++++++++
.../server/blockmanagement/SlowPeerTracker.java | 135 ++++++++++-----------
.../apache/hadoop/hdfs/TestSlowDatanodeReport.java | 15 ++-
.../TestReplicationPolicyExcludeSlowNodes.java | 12 +-
.../blockmanagement/TestSlowPeerTracker.java | 104 ++++++++++------
8 files changed, 324 insertions(+), 125 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 9fe9ace081d..ddb17d42cdd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1886,8 +1886,9 @@ public class DatanodeManager {
if (LOG.isDebugEnabled()) {
LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap);
}
- for (String slowNodeId : slowPeersMap.keySet()) {
- slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
+ for (Map.Entry<String, Double> slowNodeId : slowPeersMap.entrySet()) {
+ slowPeerTracker.addReport(slowNodeId.getKey(), nodeReg.getIpcAddr(false),
+ slowNodeId.getValue());
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerDisabledTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerDisabledTracker.java
index 2f6be2f5a7b..567984204a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerDisabledTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerDisabledTracker.java
@@ -58,20 +58,20 @@ public class SlowPeerDisabledTracker extends SlowPeerTracker {
}
@Override
- public void addReport(String slowNode, String reportingNode) {
+ public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) {
LOG.trace("Adding slow peer report is disabled. To enable it, please enable config {}.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
}
@Override
- public Set<String> getReportsForNode(String slowNode) {
+ public Set<SlowPeerLatencyWithReportingNode> getReportsForNode(String slowNode) {
LOG.trace("Retrieval of slow peer report is disabled. To enable it, please enable config {}.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
return ImmutableSet.of();
}
@Override
- public Map<String, SortedSet<String>> getReportsForAllDataNodes() {
+ public Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> getReportsForAllDataNodes() {
LOG.trace("Retrieval of slow peer report for all nodes is disabled. "
+ "To enable it, please enable config {}.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerJsonReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerJsonReport.java
new file mode 100644
index 00000000000..b9b741e9d61
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerJsonReport.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.SortedSet;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * This structure is a thin wrapper over slow peer reports to make Json
+ * [de]serialization easy.
+ */
+@InterfaceAudience.Private
+final class SlowPeerJsonReport {
+
+ @JsonProperty("SlowNode")
+ private final String slowNode;
+
+ @JsonProperty("SlowPeerLatencyWithReportingNodes")
+ private final SortedSet<SlowPeerLatencyWithReportingNode> slowPeerLatencyWithReportingNodes;
+
+ SlowPeerJsonReport(
+ @JsonProperty("SlowNode")
+ String slowNode,
+ @JsonProperty("SlowPeerLatencyWithReportingNodes")
+ SortedSet<SlowPeerLatencyWithReportingNode> slowPeerLatencyWithReportingNodes) {
+ this.slowNode = slowNode;
+ this.slowPeerLatencyWithReportingNodes = slowPeerLatencyWithReportingNodes;
+ }
+
+ public String getSlowNode() {
+ return slowNode;
+ }
+
+ public SortedSet<SlowPeerLatencyWithReportingNode> getSlowPeerLatencyWithReportingNodes() {
+ return slowPeerLatencyWithReportingNodes;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SlowPeerJsonReport that = (SlowPeerJsonReport) o;
+
+ return new EqualsBuilder()
+ .append(slowNode, that.slowNode)
+ .append(slowPeerLatencyWithReportingNodes, that.slowPeerLatencyWithReportingNodes)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(slowNode)
+ .append(slowPeerLatencyWithReportingNodes)
+ .toHashCode();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerLatencyWithReportingNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerLatencyWithReportingNode.java
new file mode 100644
index 00000000000..a3f90062600
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerLatencyWithReportingNode.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * This class represents the reporting node and the slow node's latency as observed by the
+ * reporting node. This class is used by SlowPeerJsonReport class.
+ */
+@InterfaceAudience.Private
+final class SlowPeerLatencyWithReportingNode
+ implements Comparable<SlowPeerLatencyWithReportingNode> {
+
+ @JsonProperty("ReportingNode")
+ private final String reportingNode;
+
+ @JsonProperty("ReportedLatency")
+ private final Double reportedLatency;
+
+ SlowPeerLatencyWithReportingNode(
+ @JsonProperty("ReportingNode")
+ String reportingNode,
+ @JsonProperty("ReportedLatency")
+ Double reportedLatency) {
+ this.reportingNode = reportingNode;
+ this.reportedLatency = reportedLatency;
+ }
+
+ public String getReportingNode() {
+ return reportingNode;
+ }
+
+ public Double getReportedLatency() {
+ return reportedLatency;
+ }
+
+ @Override
+ public int compareTo(SlowPeerLatencyWithReportingNode o) {
+ return this.reportingNode.compareTo(o.getReportingNode());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SlowPeerLatencyWithReportingNode that = (SlowPeerLatencyWithReportingNode) o;
+
+ return new EqualsBuilder()
+ .append(reportingNode, that.reportingNode)
+ .append(reportedLatency, that.reportedLatency)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(reportingNode)
+ .append(reportedLatency)
+ .toHashCode();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
index 9998efe7f20..e8a7c825d1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
@@ -37,7 +36,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -95,7 +93,7 @@ public class SlowPeerTracker {
* Stale reports are not evicted proactively and can potentially
* hang around forever.
*/
- private final ConcurrentMap<String, ConcurrentMap<String, Long>>
+ private final ConcurrentMap<String, ConcurrentMap<String, LatencyWithLastReportTime>>
allReports;
public SlowPeerTracker(Configuration conf, Timer timer) {
@@ -123,12 +121,12 @@ public class SlowPeerTracker {
* Add a new report. DatanodeIds can be the DataNodeIds or addresses
* We don't care as long as the caller is consistent.
*
- * @param reportingNode DataNodeId of the node reporting on its peer.
* @param slowNode DataNodeId of the peer suspected to be slow.
+ * @param reportingNode DataNodeId of the node reporting on its peer.
+ * @param slowNodeLatency Aggregate latency of slownode as reported by the reporting node.
*/
- public void addReport(String slowNode,
- String reportingNode) {
- ConcurrentMap<String, Long> nodeEntries = allReports.get(slowNode);
+ public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) {
+ ConcurrentMap<String, LatencyWithLastReportTime> nodeEntries = allReports.get(slowNode);
if (nodeEntries == null) {
// putIfAbsent guards against multiple writers.
@@ -137,7 +135,8 @@ public class SlowPeerTracker {
}
// Replace the existing entry from this node, if any.
- nodeEntries.put(reportingNode, timer.monotonicNow());
+ nodeEntries.put(reportingNode,
+ new LatencyWithLastReportTime(timer.monotonicNow(), slowNodeLatency));
}
/**
@@ -147,8 +146,8 @@ public class SlowPeerTracker {
* @param slowNode target node Id.
* @return set of reports which implicate the target node as being slow.
*/
- public Set<String> getReportsForNode(String slowNode) {
- final ConcurrentMap<String, Long> nodeEntries =
+ public Set<SlowPeerLatencyWithReportingNode> getReportsForNode(String slowNode) {
+ final ConcurrentMap<String, LatencyWithLastReportTime> nodeEntries =
allReports.get(slowNode);
if (nodeEntries == null || nodeEntries.isEmpty()) {
@@ -163,17 +162,19 @@ public class SlowPeerTracker {
*
* @return map from SlowNodeId {@literal ->} (set of nodes reporting peers).
*/
- public Map<String, SortedSet<String>> getReportsForAllDataNodes() {
+ public Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> getReportsForAllDataNodes() {
if (allReports.isEmpty()) {
return ImmutableMap.of();
}
- final Map<String, SortedSet<String>> allNodesValidReports = new HashMap<>();
+ final Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> allNodesValidReports =
+ new HashMap<>();
final long now = timer.monotonicNow();
- for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
- allReports.entrySet()) {
- SortedSet<String> validReports = filterNodeReports(entry.getValue(), now);
+ for (Map.Entry<String, ConcurrentMap<String, LatencyWithLastReportTime>> entry
+ : allReports.entrySet()) {
+ SortedSet<SlowPeerLatencyWithReportingNode> validReports =
+ filterNodeReports(entry.getValue(), now);
if (!validReports.isEmpty()) {
allNodesValidReports.put(entry.getKey(), validReports);
}
@@ -184,17 +185,18 @@ public class SlowPeerTracker {
/**
* Filter the given reports to return just the valid ones.
*
- * @param reports
- * @param now
- * @return
+ * @param reports Current set of reports.
+ * @param now Current time.
+ * @return Set of valid reports that were created within last reportValidityMs millis.
*/
- private SortedSet<String> filterNodeReports(
- ConcurrentMap<String, Long> reports, long now) {
- final SortedSet<String> validReports = new TreeSet<>();
-
- for (Map.Entry<String, Long> entry : reports.entrySet()) {
- if (now - entry.getValue() < reportValidityMs) {
- validReports.add(entry.getKey());
+ private SortedSet<SlowPeerLatencyWithReportingNode> filterNodeReports(
+ ConcurrentMap<String, LatencyWithLastReportTime> reports, long now) {
+ final SortedSet<SlowPeerLatencyWithReportingNode> validReports = new TreeSet<>();
+
+ for (Map.Entry<String, LatencyWithLastReportTime> entry : reports.entrySet()) {
+ if (now - entry.getValue().getTime() < reportValidityMs) {
+ validReports.add(
+ new SlowPeerLatencyWithReportingNode(entry.getKey(), entry.getValue().getLatency()));
}
}
return validReports;
@@ -206,7 +208,7 @@ public class SlowPeerTracker {
* serialization failed.
*/
public String getJson() {
- Collection<ReportForJson> validReports = getJsonReports(
+ Collection<SlowPeerJsonReport> validReports = getJsonReports(
maxNodesToReport);
try {
return WRITER.writeValueAsString(validReports);
@@ -217,42 +219,15 @@ public class SlowPeerTracker {
}
}
- /**
- * This structure is a thin wrapper over reports to make Json
- * [de]serialization easy.
- */
- public static class ReportForJson {
- @JsonProperty("SlowNode")
- final private String slowNode;
-
- @JsonProperty("ReportingNodes")
- final private SortedSet<String> reportingNodes;
-
- public ReportForJson(
- @JsonProperty("SlowNode") String slowNode,
- @JsonProperty("ReportingNodes") SortedSet<String> reportingNodes) {
- this.slowNode = slowNode;
- this.reportingNodes = reportingNodes;
- }
-
- public String getSlowNode() {
- return slowNode;
- }
-
- public SortedSet<String> getReportingNodes() {
- return reportingNodes;
- }
- }
-
/**
* Returns all tracking slow peers.
* @param numNodes
* @return
*/
public List<String> getSlowNodes(int numNodes) {
- Collection<ReportForJson> jsonReports = getJsonReports(numNodes);
+ Collection<SlowPeerJsonReport> jsonReports = getJsonReports(numNodes);
ArrayList<String> slowNodes = new ArrayList<>();
- for (ReportForJson jsonReport : jsonReports) {
+ for (SlowPeerJsonReport jsonReport : jsonReports) {
slowNodes.add(jsonReport.getSlowNode());
}
if (!slowNodes.isEmpty()) {
@@ -267,35 +242,30 @@ public class SlowPeerTracker {
* @param numNodes number of nodes to return. This is to limit the
* size of the generated JSON.
*/
- private Collection<ReportForJson> getJsonReports(int numNodes) {
+ private Collection<SlowPeerJsonReport> getJsonReports(int numNodes) {
if (allReports.isEmpty()) {
return Collections.emptyList();
}
- final PriorityQueue<ReportForJson> topNReports =
- new PriorityQueue<>(allReports.size(),
- new Comparator<ReportForJson>() {
- @Override
- public int compare(ReportForJson o1, ReportForJson o2) {
- return Ints.compare(o1.reportingNodes.size(),
- o2.reportingNodes.size());
- }
- });
+ final PriorityQueue<SlowPeerJsonReport> topNReports = new PriorityQueue<>(allReports.size(),
+ (o1, o2) -> Ints.compare(o1.getSlowPeerLatencyWithReportingNodes().size(),
+ o2.getSlowPeerLatencyWithReportingNodes().size()));
final long now = timer.monotonicNow();
- for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
- allReports.entrySet()) {
- SortedSet<String> validReports = filterNodeReports(
- entry.getValue(), now);
+ for (Map.Entry<String, ConcurrentMap<String, LatencyWithLastReportTime>> entry
+ : allReports.entrySet()) {
+ SortedSet<SlowPeerLatencyWithReportingNode> validReports =
+ filterNodeReports(entry.getValue(), now);
if (!validReports.isEmpty()) {
if (topNReports.size() < numNodes) {
- topNReports.add(new ReportForJson(entry.getKey(), validReports));
- } else if (topNReports.peek().getReportingNodes().size() <
- validReports.size()){
+ topNReports.add(new SlowPeerJsonReport(entry.getKey(), validReports));
+ } else if (topNReports.peek() != null
+ && topNReports.peek().getSlowPeerLatencyWithReportingNodes().size()
+ < validReports.size()) {
// Remove the lowest element
topNReports.poll();
- topNReports.add(new ReportForJson(entry.getKey(), validReports));
+ topNReports.add(new SlowPeerJsonReport(entry.getKey(), validReports));
}
}
}
@@ -306,4 +276,23 @@ public class SlowPeerTracker {
long getReportValidityMs() {
return reportValidityMs;
}
+
+ private static class LatencyWithLastReportTime {
+ private final Long time;
+ private final Double latency;
+
+ LatencyWithLastReportTime(Long time, Double latency) {
+ this.time = time;
+ this.latency = latency;
+ }
+
+ public Long getTime() {
+ return time;
+ }
+
+ public Double getLatency() {
+ return latency;
+ }
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java
index 583c3159d5c..d6c728c68ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java
@@ -86,13 +86,18 @@ public class TestSlowDatanodeReport {
return false;
}
}, 2000, 180000, "Slow nodes could not be detected");
+ LOG.info("Slow peer report: {}", cluster.getNameNode().getSlowPeersReport());
+ Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().length() > 0);
+ Assert.assertTrue(
+ cluster.getNameNode().getSlowPeersReport().contains(slowNode.getDatanodeHostname()));
+ Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("15.5"));
}
@Test
public void testMultiNodesReport() throws Exception {
List<DataNode> dataNodes = cluster.getDataNodes();
dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of(
- dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 15.5));
+ dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 14.5));
dataNodes.get(1).getPeerMetrics().setTestOutliers(ImmutableMap.of(
dataNodes.get(2).getDatanodeHostname() + ":" + dataNodes.get(2).getIpcPort(), 18.7));
DistributedFileSystem distributedFileSystem = cluster.getFileSystem();
@@ -107,6 +112,14 @@ public class TestSlowDatanodeReport {
return false;
}
}, 2000, 200000, "Slow nodes could not be detected");
+ LOG.info("Slow peer report: {}", cluster.getNameNode().getSlowPeersReport());
+ Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().length() > 0);
+ Assert.assertTrue(cluster.getNameNode().getSlowPeersReport()
+ .contains(dataNodes.get(1).getDatanodeHostname()));
+ Assert.assertTrue(cluster.getNameNode().getSlowPeersReport()
+ .contains(dataNodes.get(2).getDatanodeHostname()));
+ Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("14.5"));
+ Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("18.7"));
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java
index 3f26ffc1b44..cfc0c9fa977 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java
@@ -87,12 +87,12 @@ public class TestReplicationPolicyExcludeSlowNodes
// mock slow nodes
SlowPeerTracker tracker = dnManager.getSlowPeerTracker();
- tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr());
- tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr());
- tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr());
- tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr());
- tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr());
- tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr());
+ tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr(), 1.29463);
+ tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr(), 2.9576);
+ tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr(), 3.59674);
+ tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr(), 4.238456);
+ tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr(), 5.18375);
+ tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr(), 6.39576);
// waiting for slow nodes collector run
Thread.sleep(3000);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
index fb2928cc486..67a212f21ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
@@ -23,8 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker
- .ReportForJson;
import org.apache.hadoop.util.FakeTimer;
import org.junit.Before;
import org.junit.Rule;
@@ -37,6 +35,7 @@ import java.io.IOException;
import java.util.Set;
import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -59,7 +58,7 @@ public class TestSlowPeerTracker {
private FakeTimer timer;
private long reportValidityMs;
private static final ObjectReader READER =
- new ObjectMapper().readerFor(new TypeReference<Set<ReportForJson>>() {});
+ new ObjectMapper().readerFor(new TypeReference<Set<SlowPeerJsonReport>>() {});
@Before
public void setup() {
@@ -80,9 +79,9 @@ public class TestSlowPeerTracker {
@Test
public void testReportsAreRetrieved() {
- tracker.addReport("node2", "node1");
- tracker.addReport("node3", "node1");
- tracker.addReport("node3", "node2");
+ tracker.addReport("node2", "node1", 1.2);
+ tracker.addReport("node3", "node1", 2.1);
+ tracker.addReport("node3", "node2", 1.22);
assertThat(tracker.getReportsForAllDataNodes().size(), is(2));
assertThat(tracker.getReportsForNode("node2").size(), is(1));
@@ -95,9 +94,9 @@ public class TestSlowPeerTracker {
*/
@Test
public void testAllReportsAreExpired() {
- tracker.addReport("node2", "node1");
- tracker.addReport("node3", "node2");
- tracker.addReport("node1", "node3");
+ tracker.addReport("node2", "node1", 0.123);
+ tracker.addReport("node3", "node2", 0.2334);
+ tracker.addReport("node1", "node3", 1.234);
// No reports should expire after 1ms.
timer.advance(1);
@@ -117,13 +116,14 @@ public class TestSlowPeerTracker {
*/
@Test
public void testSomeReportsAreExpired() {
- tracker.addReport("node3", "node1");
- tracker.addReport("node3", "node2");
+ tracker.addReport("node3", "node1", 1.234);
+ tracker.addReport("node3", "node2", 1.222);
timer.advance(reportValidityMs);
- tracker.addReport("node3", "node4");
+ tracker.addReport("node3", "node4", 1.20);
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
assertThat(tracker.getReportsForNode("node3").size(), is(1));
- assertTrue(tracker.getReportsForNode("node3").contains("node4"));
+ assertEquals(1, tracker.getReportsForNode("node3").stream()
+ .filter(e -> e.getReportingNode().equals("node4")).count());
}
/**
@@ -131,24 +131,24 @@ public class TestSlowPeerTracker {
*/
@Test
public void testReplacement() {
- tracker.addReport("node2", "node1");
+ tracker.addReport("node2", "node1", 2.1);
timer.advance(reportValidityMs); // Expire the report.
assertThat(tracker.getReportsForAllDataNodes().size(), is(0));
// This should replace the expired report with a newer valid one.
- tracker.addReport("node2", "node1");
+ tracker.addReport("node2", "node1", 0.001);
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
assertThat(tracker.getReportsForNode("node2").size(), is(1));
}
@Test
public void testGetJson() throws IOException {
- tracker.addReport("node1", "node2");
- tracker.addReport("node2", "node3");
- tracker.addReport("node2", "node1");
- tracker.addReport("node4", "node1");
+ tracker.addReport("node1", "node2", 1.1);
+ tracker.addReport("node2", "node3", 1.23);
+ tracker.addReport("node2", "node1", 2.13);
+ tracker.addReport("node4", "node1", 1.244);
- final Set<ReportForJson> reports = getAndDeserializeJson();
+ final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
// And ensure its contents are what we expect.
assertThat(reports.size(), is(3));
@@ -161,19 +161,19 @@ public class TestSlowPeerTracker {
@Test
public void testGetJsonSizeIsLimited() throws IOException {
- tracker.addReport("node1", "node2");
- tracker.addReport("node1", "node3");
- tracker.addReport("node2", "node3");
- tracker.addReport("node2", "node4");
- tracker.addReport("node3", "node4");
- tracker.addReport("node3", "node5");
- tracker.addReport("node4", "node6");
- tracker.addReport("node5", "node6");
- tracker.addReport("node5", "node7");
- tracker.addReport("node6", "node7");
- tracker.addReport("node6", "node8");
-
- final Set<ReportForJson> reports = getAndDeserializeJson();
+ tracker.addReport("node1", "node2", 1.634);
+ tracker.addReport("node1", "node3", 2.3566);
+ tracker.addReport("node2", "node3", 3.869);
+ tracker.addReport("node2", "node4", 4.1356);
+ tracker.addReport("node3", "node4", 1.73057);
+ tracker.addReport("node3", "node5", 2.4956730);
+ tracker.addReport("node4", "node6", 3.29847);
+ tracker.addReport("node5", "node6", 4.13444);
+ tracker.addReport("node5", "node7", 5.10845);
+ tracker.addReport("node6", "node8", 2.37464);
+ tracker.addReport("node6", "node7", 1.29475656);
+
+ final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
// Ensure that node4 is not in the list since it was
// tagged by just one peer and we already have 5 other nodes.
@@ -185,22 +185,46 @@ public class TestSlowPeerTracker {
assertTrue(isNodeInReports(reports, "node3"));
assertTrue(isNodeInReports(reports, "node5"));
assertTrue(isNodeInReports(reports, "node6"));
+
+ assertEquals(1, reports.stream().filter(
+ e -> e.getSlowNode().equals("node1") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
+ && e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(1.634)
+ && e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency().equals(2.3566))
+ .count());
+
+ assertEquals(1, reports.stream().filter(
+ e -> e.getSlowNode().equals("node2") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
+ && e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(3.869)
+ && e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency().equals(4.1356))
+ .count());
+
+ assertEquals(1, reports.stream().filter(
+ e -> e.getSlowNode().equals("node3") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
+ && e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(1.73057)
+ && e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency()
+ .equals(2.4956730)).count());
+
+ assertEquals(1, reports.stream().filter(
+ e -> e.getSlowNode().equals("node6") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
+ && e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency()
+ .equals(1.29475656) && e.getSlowPeerLatencyWithReportingNodes().last()
+ .getReportedLatency().equals(2.37464)).count());
}
@Test
public void testLowRankedElementsIgnored() throws IOException {
// Insert 5 nodes with 2 peer reports each.
for (int i = 0; i < 5; ++i) {
- tracker.addReport("node" + i, "reporter1");
- tracker.addReport("node" + i, "reporter2");
+ tracker.addReport("node" + i, "reporter1", 1.295673);
+ tracker.addReport("node" + i, "reporter2", 2.38560);
}
// Insert 10 nodes with 1 peer report each.
for (int i = 10; i < 20; ++i) {
- tracker.addReport("node" + i, "reporter1");
+ tracker.addReport("node" + i, "reporter1", 3.4957);
}
- final Set<ReportForJson> reports = getAndDeserializeJson();
+ final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
// Ensure that only the first 5 nodes with two reports each were
// included in the JSON.
@@ -210,8 +234,8 @@ public class TestSlowPeerTracker {
}
private boolean isNodeInReports(
- Set<ReportForJson> reports, String node) {
- for (ReportForJson report : reports) {
+ Set<SlowPeerJsonReport> reports, String node) {
+ for (SlowPeerJsonReport report : reports) {
if (report.getSlowNode().equalsIgnoreCase(node)) {
return true;
}
@@ -219,7 +243,7 @@ public class TestSlowPeerTracker {
return false;
}
- private Set<ReportForJson> getAndDeserializeJson()
+ private Set<SlowPeerJsonReport> getAndDeserializeJson()
throws IOException {
final String json = tracker.getJson();
LOG.info("Got JSON: {}", json);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org