You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2016/12/27 19:52:14 UTC
[06/14] hadoop git commit: HDFS-10917. Collect peer performance
statistics on DataNode. Contributed by Xiaobing Zhou.
HDFS-10917. Collect peer performance statistics on DataNode. Contributed by Xiaobing Zhou.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4e902965
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4e902965
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4e902965
Branch: refs/heads/YARN-5734
Commit: 4e9029653dfa7a803d73c173cb7044f7e0dc1eb1
Parents: e92a770
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Thu Dec 22 23:46:58 2016 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Thu Dec 22 23:46:58 2016 -0800
----------------------------------------------------------------------
.../hadoop/metrics2/MetricsJsonBuilder.java | 125 +++++++++
.../lib/MutableRatesWithAggregation.java | 40 ++-
.../hadoop/metrics2/lib/RollingAverages.java | 251 +++++++++++++++++++
.../metrics2/lib/TestRollingAverages.java | 123 +++++++++
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 11 +
.../hdfs/server/datanode/BlockReceiver.java | 48 +++-
.../hadoop/hdfs/server/datanode/DataNode.java | 14 +-
.../hdfs/server/datanode/DataNodeMXBean.java | 12 +
.../hdfs/server/datanode/DataXceiver.java | 9 +
.../datanode/metrics/DataNodePeerMetrics.java | 117 +++++++++
.../src/main/resources/hdfs-default.xml | 25 ++
.../datanode/TestDataNodePeerMetrics.java | 92 +++++++
12 files changed, 850 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java
new file mode 100644
index 0000000..8e42909
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Build a JSON dump of the metrics.
+ *
+ * The {@link #toString()} operator dumps out all values collected.
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MetricsJsonBuilder extends MetricsRecordBuilder {
+
+ public static final Logger LOG = LoggerFactory
+ .getLogger(MetricsRecordBuilder.class);
+ private final MetricsCollector parent;
+ private Map<String, Object> innerMetrics = new LinkedHashMap<>();
+
+ /**
+ * Build an instance.
+ * @param parent parent collector. Unused in this instance; only used for
+ * the {@link #parent()} method
+ */
+ public MetricsJsonBuilder(MetricsCollector parent) {
+ this.parent = parent;
+ }
+
+ private MetricsRecordBuilder tuple(String key, Object value) {
+ innerMetrics.put(key, value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder tag(MetricsInfo info, String value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsRecordBuilder add(MetricsTag tag) {
+ return tuple(tag.name(), tag.value());
+ }
+
+ @Override
+ public MetricsRecordBuilder add(AbstractMetric metric) {
+ return tuple(metric.info().name(), metric.toString());
+ }
+
+ @Override
+ public MetricsRecordBuilder setContext(String value) {
+ return tuple("context", value);
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
+ return tuple(info.name(), value);
+ }
+
+ @Override
+ public MetricsCollector parent() {
+ return parent;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return new ObjectMapper().writeValueAsString(innerMetrics);
+ } catch (IOException e) {
+ LOG.warn("Failed to dump to Json.", e);
+ return ExceptionUtils.getStackTrace(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
index 64eae03..9827ca7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.metrics2.lib;
import com.google.common.collect.Sets;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -50,7 +49,8 @@ import org.apache.hadoop.metrics2.util.SampleStat;
@InterfaceStability.Evolving
public class MutableRatesWithAggregation extends MutableMetric {
static final Log LOG = LogFactory.getLog(MutableRatesWithAggregation.class);
- private final Map<String, MutableRate> globalMetrics = new HashMap<>();
+ private final Map<String, MutableRate> globalMetrics =
+ new ConcurrentHashMap<>();
private final Set<Class<?>> protocolCache = Sets.newHashSet();
private final ConcurrentLinkedDeque<WeakReference<ConcurrentMap<String, ThreadSafeSampleStat>>>
@@ -107,12 +107,7 @@ public class MutableRatesWithAggregation extends MutableMetric {
// Thread has died; clean up its state
iter.remove();
} else {
- // Aggregate the thread's local samples into the global metrics
- for (Map.Entry<String, ThreadSafeSampleStat> entry : map.entrySet()) {
- String name = entry.getKey();
- MutableRate globalMetric = addMetricIfNotExists(name);
- entry.getValue().snapshotInto(globalMetric);
- }
+ aggregateLocalStatesToGlobalMetrics(map);
}
}
for (MutableRate globalMetric : globalMetrics.values()) {
@@ -120,6 +115,35 @@ public class MutableRatesWithAggregation extends MutableMetric {
}
}
+ /**
+ * Collects states maintained in {@link ThreadLocal}, if any.
+ */
+ synchronized void collectThreadLocalStates() {
+ final ConcurrentMap<String, ThreadSafeSampleStat> localStats =
+ threadLocalMetricsMap.get();
+ if (localStats != null) {
+ aggregateLocalStatesToGlobalMetrics(localStats);
+ }
+ }
+
+ /**
+ * Aggregates the thread's local samples into the global metrics. The caller
+ * should ensure its thread safety.
+ */
+ private void aggregateLocalStatesToGlobalMetrics(
+ final ConcurrentMap<String, ThreadSafeSampleStat> localStats) {
+ for (Map.Entry<String, ThreadSafeSampleStat> entry : localStats
+ .entrySet()) {
+ String name = entry.getKey();
+ MutableRate globalMetric = addMetricIfNotExists(name);
+ entry.getValue().snapshotInto(globalMetric);
+ }
+ }
+
+ Map<String, MutableRate> getGlobalMetrics() {
+ return globalMetrics;
+ }
+
private synchronized MutableRate addMetricIfNotExists(String name) {
MutableRate metric = globalMetrics.get(name);
if (metric == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
new file mode 100644
index 0000000..06ae30d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static org.apache.hadoop.metrics2.lib.Interns.*;
+
+/**
+ * <p>
+ * This class maintains a group of rolling average metrics. It implements the
+ * algorithm of rolling average, i.e. a number of sliding windows are kept to
+ * roll over and evict old subsets of samples. Each window has a subset of
+ * samples in a stream, where sub-sum and sub-total are collected. All sub-sums
+ * and sub-totals in all windows will be aggregated to final-sum and final-total
+ * used to compute final average, which is called rolling average.
+ * </p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RollingAverages extends MutableMetric implements Closeable {
+
+ private final MutableRatesWithAggregation innerMetrics =
+ new MutableRatesWithAggregation();
+
+ private static final ScheduledExecutorService SCHEDULER = Executors
+ .newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("RollingAverages-%d").build());
+
+ private ScheduledFuture<?> scheduledTask = null;
+ private Map<String, MutableRate> currentSnapshot;
+ private final int numWindows;
+ private final String avgInfoNameTemplate;
+ private final String avgInfoDescTemplate;
+
+ private static class SumAndCount {
+ private final double sum;
+ private final long count;
+
+ public SumAndCount(final double sum, final long count) {
+ this.sum = sum;
+ this.count = count;
+ }
+
+ public double getSum() {
+ return sum;
+ }
+
+ public long getCount() {
+ return count;
+ }
+ }
+
+ /**
+ * <p>
+ * key: metric name
+ * </p>
+ * <p>
+ * value: deque where sub-sums and sub-totals for sliding windows are
+ * maintained.
+ * </p>
+ */
+ private Map<String, LinkedBlockingDeque<SumAndCount>> averages =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Constructor of {@link RollingAverages}.
+ * @param windowSize
+ * The number of seconds of each window for which sub set of samples
+ * are gathered to compute the rolling average, A.K.A. roll over
+ * interval.
+ * @param numWindows
+ * The number of windows maintained to compute the rolling average.
+ * @param valueName
+ * of the metric (e.g. "Time", "Latency")
+ */
+ public RollingAverages(
+ final int windowSize,
+ final int numWindows,
+ final String valueName) {
+ String uvName = StringUtils.capitalize(valueName);
+ String lvName = StringUtils.uncapitalize(valueName);
+ avgInfoNameTemplate = "%s" + "RollingAvg"+ uvName;
+ avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s";
+ this.numWindows = numWindows;
+ scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
+ windowSize, windowSize, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Constructor of {@link RollingAverages}.
+ * @param windowSize
+ * The number of seconds of each window for which sub set of samples
+ * are gathered to compute rolling average, also A.K.A roll over
+ * interval.
+ * @param numWindows
+ * The number of windows maintained in the same time to compute the
+ * average of the rolling averages.
+ */
+ public RollingAverages(
+ final int windowSize,
+ final int numWindows) {
+ this(windowSize, numWindows, "Time");
+ }
+
+ @Override
+ public void snapshot(MetricsRecordBuilder builder, boolean all) {
+ if (all || changed()) {
+ for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
+ : averages.entrySet()) {
+ final String name = entry.getKey();
+ final MetricsInfo avgInfo = info(
+ String.format(avgInfoNameTemplate, StringUtils.capitalize(name)),
+ String.format(avgInfoDescTemplate, StringUtils.uncapitalize(name)));
+ double totalSum = 0;
+ long totalCount = 0;
+
+ for (final SumAndCount sumAndCount : entry.getValue()) {
+ totalCount += sumAndCount.getCount();
+ totalSum += sumAndCount.getSum();
+ }
+
+ if (totalCount != 0) {
+ builder.addGauge(avgInfo, totalSum / totalCount);
+ }
+ }
+ if (changed()) {
+ clearChanged();
+ }
+ }
+ }
+
+ /**
+ * Collects states maintained in {@link ThreadLocal}, if any.
+ */
+ public void collectThreadLocalStates() {
+ innerMetrics.collectThreadLocalStates();
+ }
+
+ /**
+ * @param name
+ * name of metric
+ * @param value
+ * value of metric
+ */
+ public void add(final String name, final long value) {
+ innerMetrics.add(name, value);
+ }
+
+ private static class RatesRoller implements Runnable {
+ private final RollingAverages parent;
+
+ public RatesRoller(final RollingAverages parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void run() {
+ synchronized (parent) {
+ final MetricsCollectorImpl mc = new MetricsCollectorImpl();
+ final MetricsRecordBuilder rb = mc.addRecord("RatesRoller");
+ /**
+ * snapshot all metrics regardless of being changed or not, in case no
+ * ops since last snapshot, we will get 0.
+ */
+ parent.innerMetrics.snapshot(rb, true);
+ Preconditions.checkState(mc.getRecords().size() == 1,
+ "There must be only one record and it's named with 'RatesRoller'");
+
+ parent.currentSnapshot = parent.innerMetrics.getGlobalMetrics();
+ parent.rollOverAvgs();
+ }
+ parent.setChanged();
+ }
+ }
+
+ /**
+ * Iterates over snapshot to capture all Avg metrics into rolling structure
+ * {@link RollingAverages#averages}.
+ */
+ private void rollOverAvgs() {
+ if (currentSnapshot == null) {
+ return;
+ }
+
+ for (Map.Entry<String, MutableRate> entry : currentSnapshot.entrySet()) {
+ final MutableRate rate = entry.getValue();
+ final LinkedBlockingDeque<SumAndCount> deque = averages.computeIfAbsent(
+ entry.getKey(),
+ new Function<String, LinkedBlockingDeque<SumAndCount>>() {
+ @Override
+ public LinkedBlockingDeque<SumAndCount> apply(String k) {
+ return new LinkedBlockingDeque<SumAndCount>(numWindows);
+ }
+ });
+ final SumAndCount sumAndCount = new SumAndCount(
+ rate.lastStat().total(),
+ rate.lastStat().numSamples());
+ /* put newest sum and count to the end */
+ if (!deque.offerLast(sumAndCount)) {
+ deque.pollFirst();
+ deque.offerLast(sumAndCount);
+ }
+ }
+
+ setChanged();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (scheduledTask != null) {
+ scheduledTask.cancel(false);
+ }
+ scheduledTask = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
new file mode 100644
index 0000000..899d98c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.lib;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Matchers.anyDouble;
+import static org.mockito.Matchers.eq;
+
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.util.Time;
+import org.junit.Test;
+
+/**
+ * This class tests various cases of the algorithms implemented in
+ * {@link RollingAverages}.
+ */
+public class TestRollingAverages {
+ /**
+ * Tests if the results are correct if no samples are inserted, dry run of
+ * empty roll over.
+ */
+ @Test(timeout = 30000)
+ public void testRollingAveragesEmptyRollover() throws Exception {
+ final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+ /* 5s interval and 2 windows */
+ try (final RollingAverages rollingAverages = new RollingAverages(5, 2)) {
+ /* Check it initially */
+ rollingAverages.snapshot(rb, true);
+ verify(rb, never()).addGauge(
+ info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
+ verify(rb, never()).addGauge(
+ info("BarAvgTime", "Rolling average time for bar"), (long) 0);
+
+ /* sleep 6s longer than 5s interval to wait for rollover done */
+ Thread.sleep(6000);
+ rollingAverages.snapshot(rb, false);
+ verify(rb, never()).addGauge(
+ info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
+ verify(rb, never()).addGauge(
+ info("BarAvgTime", "Rolling average time for bar"), (long) 0);
+ }
+ }
+
+ /**
+ * Tests the case:
+ * <p>
+ * 5s interval and 2 sliding windows
+ * </p>
+ * <p>
+ * sample stream: 1000 times 1, 2, and 3, respectively, e.g. [1, 1...1], [2,
+ * 2...2] and [3, 3...3]
+ * </p>
+ */
+ @Test(timeout = 30000)
+ public void testRollingAveragesRollover() throws Exception {
+ final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+ final String name = "foo2";
+ final int windowSize = 5; // 5s roll over interval
+ final int numWindows = 2;
+ final int numOpsPerIteration = 1000;
+ try (RollingAverages rollingAverages = new RollingAverages(windowSize,
+ numWindows)) {
+
+ /* Push values for three intervals */
+ final long start = Time.monotonicNow();
+ for (int i = 1; i <= 3; i++) {
+ /* insert value */
+ for (long j = 1; j <= numOpsPerIteration; j++) {
+ rollingAverages.add(name, i);
+ }
+
+ /**
+ * Sleep until 1s after the next windowSize seconds interval, to let the
+ * metrics roll over
+ */
+ final long sleep = (start + (windowSize * 1000 * i) + 1000)
+ - Time.monotonicNow();
+ Thread.sleep(sleep);
+
+ /* Verify that the window reset, check it has the values we pushed in */
+ rollingAverages.snapshot(rb, false);
+
+ /*
+ * #1 window with a series of 1 1000
+ * times, e.g. [1, 1...1], similarly, #2 window, e.g. [2, 2...2],
+ * #3 window, e.g. [3, 3...3]
+ */
+ final double rollingSum = numOpsPerIteration * (i > 1 ? (i - 1) : 0)
+ + numOpsPerIteration * i;
+ /* one empty window or all 2 windows full */
+ final long rollingTotal = i > 1 ? 2 * numOpsPerIteration
+ : numOpsPerIteration;
+ verify(rb).addGauge(
+ info("Foo2RollingAvgTime", "Rolling average time for foo2"),
+ rollingSum / rollingTotal);
+
+ /* Verify the metrics were added the right number of times */
+ verify(rb, times(i)).addGauge(
+ eq(info("Foo2RollingAvgTime", "Rolling average time for foo2")),
+ anyDouble());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 15bb0bd..50217a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -457,6 +457,17 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_METRICS_SESSION_ID_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
+ public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY =
+ "dfs.metrics.rolling.average.window.size";
+ public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT =
+ 3600;
+ public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY =
+ "dfs.metrics.rolling.average.window.numbers";
+ public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT =
+ 48;
+ public static final String DFS_DATANODE_PEER_STATS_ENABLED_KEY =
+ "dfs.datanode.peer.stats.enabled";
+ public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false;
public static final String DFS_DATANODE_HOST_NAME_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY;
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 23cd44d..b3aee11 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -93,6 +93,7 @@ class BlockReceiver implements Closeable {
protected final String inAddr;
protected final String myAddr;
private String mirrorAddr;
+ private String bracketedMirrorAddr;
private DataOutputStream mirrorOut;
private Daemon responder = null;
private DataTransferThrottler throttler;
@@ -119,6 +120,7 @@ class BlockReceiver implements Closeable {
/** pipeline stage */
private final BlockConstructionStage stage;
private final boolean isTransfer;
+ private boolean isPenultimateNode = false;
private boolean syncOnClose;
private long restartBudget;
@@ -575,6 +577,7 @@ class BlockReceiver implements Closeable {
DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
mirrorAddr,
duration);
+ trackSendPacketToLastNodeInPipeline(duration);
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
@@ -822,6 +825,33 @@ class BlockReceiver implements Closeable {
return lastPacketInBlock?-1:len;
}
+ /**
+ * Only tracks the latency of sending packet to the last node in pipeline.
+ * This is a conscious design choice.
+ * <p>
+ * In the case of pipeline [dn0, dn1, dn2], 5ms latency from dn0 to dn1, 100ms
+ * from dn1 to dn2, NameNode claims dn2 is slow since it sees 100ms latency to
+ * dn2. Note that NameNode is not ware of pipeline structure in this context
+ * and only sees latency between two DataNodes.
+ * </p>
+ * <p>
+ * In another case of the same pipeline, 100ms latency from dn0 to dn1, 5ms
+ * from dn1 to dn2, NameNode will miss detecting dn1 being slow since it's not
+ * the last node. However the assumption is that in a busy enough cluster
+ * there are many other pipelines where dn1 is the last node, e.g. [dn3, dn4,
+ * dn1]. Also our tracking interval is relatively long enough (at least an
+ * hour) to improve the chances of the bad DataNodes being the last nodes in
+ * multiple pipelines.
+ * </p>
+ */
+ private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
+ if (isPenultimateNode && mirrorAddr != null) {
+ datanode.getPeerMetrics().addSendPacketDownstream(
+ bracketedMirrorAddr,
+ elapsedMs);
+ }
+ }
+
private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {
return Arrays.copyOfRange(array, end - size, end);
}
@@ -886,7 +916,7 @@ class BlockReceiver implements Closeable {
((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
.getRestartOOBStatus());
}
-
+
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
@@ -895,14 +925,16 @@ class BlockReceiver implements Closeable {
DatanodeInfo[] downstreams,
boolean isReplaceBlock) throws IOException {
- syncOnClose = datanode.getDnConf().syncOnClose;
- boolean responderClosed = false;
- mirrorOut = mirrOut;
- mirrorAddr = mirrAddr;
- throttler = throttlerArg;
+ syncOnClose = datanode.getDnConf().syncOnClose;
+ boolean responderClosed = false;
+ mirrorOut = mirrOut;
+ mirrorAddr = mirrAddr;
+ bracketedMirrorAddr = "[" + mirrAddr + "]";
+ isPenultimateNode = ((downstreams != null) && (downstreams.length == 1));
+ throttler = throttlerArg;
- this.replyOut = replyOut;
- this.isReplaceBlock = isReplaceBlock;
+ this.replyOut = replyOut;
+ this.isReplaceBlock = isReplaceBlock;
try {
if (isClient && !isTransfer) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a94c4b1..4436e58 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -165,6 +165,7 @@ import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
@@ -333,6 +334,7 @@ public class DataNode extends ReconfigurableBase
private int infoSecurePort;
DataNodeMetrics metrics;
+ private DataNodePeerMetrics peerMetrics;
private InetSocketAddress streamingAddr;
// See the note below in incrDatanodeNetworkErrors re: concurrency.
@@ -1360,6 +1362,7 @@ public class DataNode extends ReconfigurableBase
initIpcServer();
metrics = DataNodeMetrics.create(getConf(), getDisplayName());
+ peerMetrics = DataNodePeerMetrics.create(getConf(), getDisplayName());
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
ecWorker = new ErasureCodingWorker(getConf(), this);
@@ -1755,11 +1758,15 @@ public class DataNode extends ReconfigurableBase
throw new IOException(ie.getMessage());
}
}
-
+
public DataNodeMetrics getMetrics() {
return metrics;
}
+ public DataNodePeerMetrics getPeerMetrics() {
+ return peerMetrics;
+ }
+
/** Ensure the authentication method is kerberos */
private void checkKerberosAuthMethod(String msg) throws IOException {
// User invoking the call must be same as the datanode user
@@ -3437,4 +3444,9 @@ public class DataNode extends ReconfigurableBase
void setBlockScanner(BlockScanner blockScanner) {
this.blockScanner = blockScanner;
}
+
+ @Override // DataNodeMXBean
+ public String getSendPacketDownstreamAvgInfo() {
+ return peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
index 37f9635..ccc5f92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
@@ -125,4 +125,16 @@ public interface DataNodeMXBean {
* Gets the {@link FileIoProvider} statistics.
*/
String getFileIoProviderStatistics();
+
+ /**
+ * Gets the average info (e.g. time) of SendPacketDownstream when the DataNode
+ * acts as the penultimate (2nd to the last) node in pipeline.
+ * <p>
+ * Example Json:
+ * {"[185.164.159.81:9801]RollingAvgTime":504.867,
+ * "[49.236.149.246:9801]RollingAvgTime":504.463,
+ * "[84.125.113.65:9801]RollingAvgTime":497.954}
+ * </p>
+ */
+ String getSendPacketDownstreamAvgInfo();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index a35a5b4..abcaa4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -323,6 +323,7 @@ class DataXceiver extends Receiver implements Runnable {
LOG.error(s, t);
}
} finally {
+ collectThreadLocalStates();
if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "
+ datanode.getXceiverCount());
@@ -335,6 +336,14 @@ class DataXceiver extends Receiver implements Runnable {
}
}
+ /**
+ * In this short living thread, any local states should be collected before
+ * the thread dies away.
+ */
+ private void collectThreadLocalStates() {
+ datanode.getPeerMetrics().collectThreadLocalStates();
+ }
+
@Override
public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> token,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
new file mode 100644
index 0000000..9344d1b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.metrics2.MetricsJsonBuilder;
+import org.apache.hadoop.metrics2.lib.RollingAverages;
+
+/**
+ * This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
+ * various peer operations.
+ */
+@InterfaceAudience.Private
+public class DataNodePeerMetrics {
+
+ static final Log LOG = LogFactory.getLog(DataNodePeerMetrics.class);
+
+ private final RollingAverages sendPacketDownstreamRollingAvgerages;
+
+ private final String name;
+ private final boolean peerStatsEnabled;
+
+ public DataNodePeerMetrics(
+ final String name,
+ final int windowSize,
+ final int numWindows,
+ final boolean peerStatsEnabled) {
+ this.name = name;
+ this.peerStatsEnabled = peerStatsEnabled;
+ sendPacketDownstreamRollingAvgerages = new RollingAverages(
+ windowSize,
+ numWindows);
+ }
+
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Creates an instance of DataNodePeerMetrics, used for registration.
+ */
+ public static DataNodePeerMetrics create(Configuration conf, String dnName) {
+ final String name = "DataNodePeerActivity-" + (dnName.isEmpty()
+ ? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt()
+ : dnName.replace(':', '-'));
+
+ final int windowSize = conf.getInt(
+ DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
+ DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT);
+ final int numWindows = conf.getInt(
+ DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
+ DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT);
+ final boolean peerStatsEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+ DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+
+ return new DataNodePeerMetrics(
+ name,
+ windowSize,
+ numWindows,
+ peerStatsEnabled);
+ }
+
+ /**
+ * Adds invocation and elapsed time of SendPacketDownstream for peer.
+ * <p>
+ * The caller should pass in a well-formatted peerAddr. e.g.
+ * "[192.168.1.110:1010]" is good. This will be translated into a full
+ * qualified metric name, e.g. "[192.168.1.110:1010]AvgTime".
+ * </p>
+ */
+ public void addSendPacketDownstream(
+ final String peerAddr,
+ final long elapsedMs) {
+ if (peerStatsEnabled) {
+ sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
+ }
+ }
+
+ /**
+ * Dump SendPacketDownstreamRollingAvgTime metrics as JSON.
+ */
+ public String dumpSendPacketDownstreamAvgInfoAsJson() {
+ final MetricsJsonBuilder builder = new MetricsJsonBuilder(null);
+ sendPacketDownstreamRollingAvgerages.snapshot(builder, true);
+ return builder.toString();
+ }
+
+ /**
+ * Collects states maintained in {@link ThreadLocal}, if any.
+ */
+ public void collectThreadLocalStates() {
+ sendPacketDownstreamRollingAvgerages.collectThreadLocalStates();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 086f667..3389d84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1972,6 +1972,31 @@
</property>
<property>
+ <name>dfs.datanode.peer.stats.enabled</name>
+ <value>false</value>
+ <description>
+ A switch to turn on/off tracking DataNode peer statistics.
+ </description>
+</property>
+
+<property>
+ <name>dfs.metrics.rolling.average.window.size</name>
+ <value>3600</value>
+ <description>
+ The number of seconds of each window for which sub set of samples are gathered
+ to compute the rolling average, A.K.A. roll over interval.
+ </description>
+</property>
+
+<property>
+ <name>dfs.metrics.rolling.average.window.numbers</name>
+ <value>48</value>
+ <description>
+ The number of windows maintained to compute the rolling average.
+ </description>
+</property>
+
+<property>
<name>hadoop.user.group.metrics.percentiles.intervals</name>
<value></value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
new file mode 100644
index 0000000..5af54a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
+
+/**
+ * This class tests various cases of DataNode peer metrics.
+ */
+public class TestDataNodePeerMetrics {
+
+ @Test(timeout = 30000)
+ public void testGetSendPacketDownstreamAvgInfo() throws Exception {
+ final int windowSize = 5; // 5s roll over interval
+ final int numWindows = 2; // 2 rolling windows
+ final int iterations = 3;
+ final int numOpsPerIteration = 1000;
+
+ final Configuration conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
+ windowSize);
+ conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
+ numWindows);
+ conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
+
+ final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create(
+ conf,
+ "Sample-DataNode");
+ final long start = Time.monotonicNow();
+ for (int i = 1; i <= iterations; i++) {
+ final String peerAddr = genPeerAddress();
+ for (int j = 1; j <= numOpsPerIteration; j++) {
+ /* simulate to get latency of 1 to 1000 ms */
+ final long latency = ThreadLocalRandom.current().nextLong(1, 1000);
+ peerMetrics.addSendPacketDownstream(peerAddr, latency);
+ }
+
+ /**
+ * Sleep until 1s after the next windowSize seconds interval, to let the
+ * metrics roll over
+ */
+ final long sleep = (start + (windowSize * 1000 * i) + 1000)
+ - Time.monotonicNow();
+ Thread.sleep(sleep);
+
+ /* dump avg info */
+ final String json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+
+ /*
+ * example json:
+ * {"[185.164.159.81:9801]RollingAvgTime":504.867,
+ * "[49.236.149.246:9801]RollingAvgTime":504.463,
+ * "[84.125.113.65:9801]RollingAvgTime":497.954}
+ */
+ assertThat(json, containsString(peerAddr));
+ }
+ }
+
+ /**
+ * Simulates to generate different peer addresses, e.g. [84.125.113.65:9801].
+ */
+ private String genPeerAddress() {
+ final ThreadLocalRandom r = ThreadLocalRandom.current();
+ return String.format("[%d.%d.%d.%d:9801]",
+ r.nextInt(1, 256), r.nextInt(1, 256),
+ r.nextInt(1, 256), r.nextInt(1, 256));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org