You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2016/12/23 08:11:51 UTC

hadoop git commit: HDFS-10917. Collect peer performance statistics on DataNode. Contributed by Xiaobing Zhou.

Repository: hadoop
Updated Branches:
  refs/heads/trunk e92a77099 -> 4e9029653


HDFS-10917. Collect peer performance statistics on DataNode. Contributed by Xiaobing Zhou.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4e902965
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4e902965
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4e902965

Branch: refs/heads/trunk
Commit: 4e9029653dfa7a803d73c173cb7044f7e0dc1eb1
Parents: e92a770
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Thu Dec 22 23:46:58 2016 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Thu Dec 22 23:46:58 2016 -0800

----------------------------------------------------------------------
 .../hadoop/metrics2/MetricsJsonBuilder.java     | 125 +++++++++
 .../lib/MutableRatesWithAggregation.java        |  40 ++-
 .../hadoop/metrics2/lib/RollingAverages.java    | 251 +++++++++++++++++++
 .../metrics2/lib/TestRollingAverages.java       | 123 +++++++++
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  11 +
 .../hdfs/server/datanode/BlockReceiver.java     |  48 +++-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  14 +-
 .../hdfs/server/datanode/DataNodeMXBean.java    |  12 +
 .../hdfs/server/datanode/DataXceiver.java       |   9 +
 .../datanode/metrics/DataNodePeerMetrics.java   | 117 +++++++++
 .../src/main/resources/hdfs-default.xml         |  25 ++
 .../datanode/TestDataNodePeerMetrics.java       |  92 +++++++
 12 files changed, 850 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java
new file mode 100644
index 0000000..8e42909
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Build a JSON dump of the metrics.
+ *
+ * The {@link #toString()} operator dumps out all values collected.
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MetricsJsonBuilder extends MetricsRecordBuilder {
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(MetricsRecordBuilder.class);
+  private final MetricsCollector parent;
+  private Map<String, Object> innerMetrics = new LinkedHashMap<>();
+
+  /**
+   * Build an instance.
+   * @param parent parent collector. Unused in this instance; only used for
+   * the {@link #parent()} method
+   */
+  public MetricsJsonBuilder(MetricsCollector parent) {
+    this.parent = parent;
+  }
+
+  private MetricsRecordBuilder tuple(String key, Object value) {
+    innerMetrics.put(key, value);
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder tag(MetricsInfo info, String value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsRecordBuilder add(MetricsTag tag) {
+    return tuple(tag.name(), tag.value());
+  }
+
+  @Override
+  public MetricsRecordBuilder add(AbstractMetric metric) {
+    return tuple(metric.info().name(), metric.toString());
+  }
+
+  @Override
+  public MetricsRecordBuilder setContext(String value) {
+    return tuple("context", value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
+    return tuple(info.name(), value);
+  }
+
+  @Override
+  public MetricsCollector parent() {
+    return parent;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return new ObjectMapper().writeValueAsString(innerMetrics);
+    } catch (IOException e) {
+      LOG.warn("Failed to dump to Json.", e);
+      return ExceptionUtils.getStackTrace(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
index 64eae03..9827ca7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.metrics2.lib;
 import com.google.common.collect.Sets;
 import java.lang.ref.WeakReference;
 import java.lang.reflect.Method;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -50,7 +49,8 @@ import org.apache.hadoop.metrics2.util.SampleStat;
 @InterfaceStability.Evolving
 public class MutableRatesWithAggregation extends MutableMetric {
   static final Log LOG = LogFactory.getLog(MutableRatesWithAggregation.class);
-  private final Map<String, MutableRate> globalMetrics = new HashMap<>();
+  private final Map<String, MutableRate> globalMetrics =
+      new ConcurrentHashMap<>();
   private final Set<Class<?>> protocolCache = Sets.newHashSet();
 
   private final ConcurrentLinkedDeque<WeakReference<ConcurrentMap<String, ThreadSafeSampleStat>>>
@@ -107,12 +107,7 @@ public class MutableRatesWithAggregation extends MutableMetric {
         // Thread has died; clean up its state
         iter.remove();
       } else {
-        // Aggregate the thread's local samples into the global metrics
-        for (Map.Entry<String, ThreadSafeSampleStat> entry : map.entrySet()) {
-          String name = entry.getKey();
-          MutableRate globalMetric = addMetricIfNotExists(name);
-          entry.getValue().snapshotInto(globalMetric);
-        }
+        aggregateLocalStatesToGlobalMetrics(map);
       }
     }
     for (MutableRate globalMetric : globalMetrics.values()) {
@@ -120,6 +115,35 @@ public class MutableRatesWithAggregation extends MutableMetric {
     }
   }
 
+  /**
+   * Collects states maintained in {@link ThreadLocal}, if any.
+   */
+  synchronized void collectThreadLocalStates() {
+    final ConcurrentMap<String, ThreadSafeSampleStat> localStats =
+        threadLocalMetricsMap.get();
+    if (localStats != null) {
+      aggregateLocalStatesToGlobalMetrics(localStats);
+    }
+  }
+
+  /**
+   * Aggregates the thread's local samples into the global metrics. The caller
+   * should ensure its thread safety.
+   */
+  private void aggregateLocalStatesToGlobalMetrics(
+      final ConcurrentMap<String, ThreadSafeSampleStat> localStats) {
+    for (Map.Entry<String, ThreadSafeSampleStat> entry : localStats
+        .entrySet()) {
+      String name = entry.getKey();
+      MutableRate globalMetric = addMetricIfNotExists(name);
+      entry.getValue().snapshotInto(globalMetric);
+    }
+  }
+
+  Map<String, MutableRate> getGlobalMetrics() {
+    return globalMetrics;
+  }
+
   private synchronized MutableRate addMetricIfNotExists(String name) {
     MutableRate metric = globalMetrics.get(name);
     if (metric == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
new file mode 100644
index 0000000..06ae30d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static org.apache.hadoop.metrics2.lib.Interns.*;
+
+/**
+ * <p>
+ * This class maintains a group of rolling average metrics. It implements the
+ * algorithm of rolling average, i.e. a number of sliding windows are kept to
+ * roll over and evict old subsets of samples. Each window has a subset of
+ * samples in a stream, where sub-sum and sub-total are collected. All sub-sums
+ * and sub-totals in all windows will be aggregated to final-sum and final-total
+ * used to compute final average, which is called rolling average.
+ * </p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RollingAverages extends MutableMetric implements Closeable {
+
+  private final MutableRatesWithAggregation innerMetrics =
+      new MutableRatesWithAggregation();
+
+  private static final ScheduledExecutorService SCHEDULER = Executors
+      .newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
+          .setNameFormat("RollingAverages-%d").build());
+
+  private ScheduledFuture<?> scheduledTask = null;
+  private Map<String, MutableRate> currentSnapshot;
+  private final int numWindows;
+  private final String avgInfoNameTemplate;
+  private final String avgInfoDescTemplate;
+
+  private static class SumAndCount {
+    private final double sum;
+    private final long count;
+
+    public SumAndCount(final double sum, final long count) {
+      this.sum = sum;
+      this.count = count;
+    }
+
+    public double getSum() {
+      return sum;
+    }
+
+    public long getCount() {
+      return count;
+    }
+  }
+
+  /**
+   * <p>
+   * key: metric name
+   * </p>
+   * <p>
+   * value: deque where sub-sums and sub-totals for sliding windows are
+   * maintained.
+   * </p>
+   */
+  private Map<String, LinkedBlockingDeque<SumAndCount>> averages =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Constructor of {@link RollingAverages}.
+   * @param windowSize
+   *          The number of seconds of each window for which sub set of samples
+   *          are gathered to compute the rolling average, A.K.A. roll over
+   *          interval.
+   * @param numWindows
+   *          The number of windows maintained to compute the rolling average.
+   * @param valueName
+   *          of the metric (e.g. "Time", "Latency")
+   */
+  public RollingAverages(
+      final int windowSize,
+      final int numWindows,
+      final String valueName) {
+    String uvName = StringUtils.capitalize(valueName);
+    String lvName = StringUtils.uncapitalize(valueName);
+    avgInfoNameTemplate = "%s" + "RollingAvg"+ uvName;
+    avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s";
+    this.numWindows = numWindows;
+    scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
+        windowSize, windowSize, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Constructor of {@link RollingAverages}.
+   * @param windowSize
+   *          The number of seconds of each window for which sub set of samples
+   *          are gathered to compute rolling average, also A.K.A roll over
+   *          interval.
+   * @param numWindows
+   *          The number of windows maintained in the same time to compute the
+   *          average of the rolling averages.
+   */
+  public RollingAverages(
+      final int windowSize,
+      final int numWindows) {
+    this(windowSize, numWindows, "Time");
+  }
+
+  @Override
+  public void snapshot(MetricsRecordBuilder builder, boolean all) {
+    if (all || changed()) {
+      for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
+          : averages.entrySet()) {
+        final String name = entry.getKey();
+        final MetricsInfo avgInfo = info(
+            String.format(avgInfoNameTemplate, StringUtils.capitalize(name)),
+            String.format(avgInfoDescTemplate, StringUtils.uncapitalize(name)));
+        double totalSum = 0;
+        long totalCount = 0;
+
+        for (final SumAndCount sumAndCount : entry.getValue()) {
+          totalCount += sumAndCount.getCount();
+          totalSum += sumAndCount.getSum();
+        }
+
+        if (totalCount != 0) {
+          builder.addGauge(avgInfo, totalSum / totalCount);
+        }
+      }
+      if (changed()) {
+        clearChanged();
+      }
+    }
+  }
+
+  /**
+   * Collects states maintained in {@link ThreadLocal}, if any.
+   */
+  public void collectThreadLocalStates() {
+    innerMetrics.collectThreadLocalStates();
+  }
+
+  /**
+   * @param name
+   *          name of metric
+   * @param value
+   *          value of metric
+   */
+  public void add(final String name, final long value) {
+    innerMetrics.add(name, value);
+  }
+
+  private static class RatesRoller implements Runnable {
+    private final RollingAverages parent;
+
+    public RatesRoller(final RollingAverages parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    public void run() {
+      synchronized (parent) {
+        final MetricsCollectorImpl mc = new MetricsCollectorImpl();
+        final MetricsRecordBuilder rb = mc.addRecord("RatesRoller");
+        /**
+         * snapshot all metrics regardless of being changed or not, in case no
+         * ops since last snapshot, we will get 0.
+         */
+        parent.innerMetrics.snapshot(rb, true);
+        Preconditions.checkState(mc.getRecords().size() == 1,
+            "There must be only one record and it's named with 'RatesRoller'");
+
+        parent.currentSnapshot = parent.innerMetrics.getGlobalMetrics();
+        parent.rollOverAvgs();
+      }
+      parent.setChanged();
+    }
+  }
+
+  /**
+   * Iterates over snapshot to capture all Avg metrics into rolling structure
+   * {@link RollingAverages#averages}.
+   */
+  private void rollOverAvgs() {
+    if (currentSnapshot == null) {
+      return;
+    }
+
+    for (Map.Entry<String, MutableRate> entry : currentSnapshot.entrySet()) {
+      final MutableRate rate = entry.getValue();
+      final LinkedBlockingDeque<SumAndCount> deque = averages.computeIfAbsent(
+          entry.getKey(),
+          new Function<String, LinkedBlockingDeque<SumAndCount>>() {
+            @Override
+            public LinkedBlockingDeque<SumAndCount> apply(String k) {
+              return new LinkedBlockingDeque<SumAndCount>(numWindows);
+            }
+          });
+      final SumAndCount sumAndCount = new SumAndCount(
+          rate.lastStat().total(),
+          rate.lastStat().numSamples());
+      /* put newest sum and count to the end */
+      if (!deque.offerLast(sumAndCount)) {
+        deque.pollFirst();
+        deque.offerLast(sumAndCount);
+      }
+    }
+
+    setChanged();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (scheduledTask != null) {
+      scheduledTask.cancel(false);
+    }
+    scheduledTask = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
new file mode 100644
index 0000000..899d98c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.lib;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Matchers.anyDouble;
+import static org.mockito.Matchers.eq;
+
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.util.Time;
+import org.junit.Test;
+
+/**
+ * This class tests various cases of the algorithms implemented in
+ * {@link RollingAverages}.
+ */
+public class TestRollingAverages {
+  /**
+   * Tests if the results are correct if no samples are inserted, dry run of
+   * empty roll over.
+   */
+  @Test(timeout = 30000)
+  public void testRollingAveragesEmptyRollover() throws Exception {
+    final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+    /* 5s interval and 2 windows */
+    try (final RollingAverages rollingAverages = new RollingAverages(5, 2)) {
+      /* Check it initially */
+      rollingAverages.snapshot(rb, true);
+      verify(rb, never()).addGauge(
+          info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
+      verify(rb, never()).addGauge(
+          info("BarAvgTime", "Rolling average time for bar"), (long) 0);
+
+      /* sleep 6s longer than 5s interval to wait for rollover done */
+      Thread.sleep(6000);
+      rollingAverages.snapshot(rb, false);
+      verify(rb, never()).addGauge(
+          info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
+      verify(rb, never()).addGauge(
+          info("BarAvgTime", "Rolling average time for bar"), (long) 0);
+    }
+  }
+
+  /**
+   * Tests the case:
+   * <p>
+   * 5s interval and 2 sliding windows
+   * </p>
+   * <p>
+   * sample stream: 1000 times 1, 2, and 3, respectively, e.g. [1, 1...1], [2,
+   * 2...2] and [3, 3...3]
+   * </p>
+   */
+  @Test(timeout = 30000)
+  public void testRollingAveragesRollover() throws Exception {
+    final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+    final String name = "foo2";
+    final int windowSize = 5; // 5s roll over interval
+    final int numWindows = 2;
+    final int numOpsPerIteration = 1000;
+    try (RollingAverages rollingAverages = new RollingAverages(windowSize,
+        numWindows)) {
+
+      /* Push values for three intervals */
+      final long start = Time.monotonicNow();
+      for (int i = 1; i <= 3; i++) {
+        /* insert value */
+        for (long j = 1; j <= numOpsPerIteration; j++) {
+          rollingAverages.add(name, i);
+        }
+
+        /**
+         * Sleep until 1s after the next windowSize seconds interval, to let the
+         * metrics roll over
+         */
+        final long sleep = (start + (windowSize * 1000 * i) + 1000)
+            - Time.monotonicNow();
+        Thread.sleep(sleep);
+
+        /* Verify that the window reset, check it has the values we pushed in */
+        rollingAverages.snapshot(rb, false);
+
+        /*
+         * #1 window with a series of 1 1000
+         * times, e.g. [1, 1...1], similarly, #2 window, e.g. [2, 2...2],
+         * #3 window, e.g. [3, 3...3]
+         */
+        final double rollingSum = numOpsPerIteration * (i > 1 ? (i - 1) : 0)
+            + numOpsPerIteration * i;
+        /* one empty window or all 2 windows full */
+        final long rollingTotal = i > 1 ? 2 * numOpsPerIteration
+            : numOpsPerIteration;
+        verify(rb).addGauge(
+            info("Foo2RollingAvgTime", "Rolling average time for foo2"),
+            rollingSum / rollingTotal);
+
+        /* Verify the metrics were added the right number of times */
+        verify(rb, times(i)).addGauge(
+            eq(info("Foo2RollingAvgTime", "Rolling average time for foo2")),
+            anyDouble());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 15bb0bd..50217a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -457,6 +457,17 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_METRICS_SESSION_ID_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
   public static final String  DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
+  public static final String  DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY =
+      "dfs.metrics.rolling.average.window.size";
+  public static final int     DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT =
+      3600;
+  public static final String  DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY =
+      "dfs.metrics.rolling.average.window.numbers";
+  public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT =
+      48;
+  public static final String  DFS_DATANODE_PEER_STATS_ENABLED_KEY =
+      "dfs.datanode.peer.stats.enabled";
+  public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false;
   public static final String  DFS_DATANODE_HOST_NAME_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY;
   public static final String  DFS_NAMENODE_CHECKPOINT_DIR_KEY =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 23cd44d..b3aee11 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -93,6 +93,7 @@ class BlockReceiver implements Closeable {
   protected final String inAddr;
   protected final String myAddr;
   private String mirrorAddr;
+  private String bracketedMirrorAddr;
   private DataOutputStream mirrorOut;
   private Daemon responder = null;
   private DataTransferThrottler throttler;
@@ -119,6 +120,7 @@ class BlockReceiver implements Closeable {
   /** pipeline stage */
   private final BlockConstructionStage stage;
   private final boolean isTransfer;
+  private boolean isPenultimateNode = false;
 
   private boolean syncOnClose;
   private long restartBudget;
@@ -575,6 +577,7 @@ class BlockReceiver implements Closeable {
         DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
             mirrorAddr,
             duration);
+        trackSendPacketToLastNodeInPipeline(duration);
         if (duration > datanodeSlowLogThresholdMs) {
           LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
               + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
@@ -822,6 +825,33 @@ class BlockReceiver implements Closeable {
     return lastPacketInBlock?-1:len;
   }
 
+  /**
+   * Only tracks the latency of sending packet to the last node in pipeline.
+   * This is a conscious design choice.
+   * <p>
+   * In the case of pipeline [dn0, dn1, dn2], 5ms latency from dn0 to dn1, 100ms
+   * from dn1 to dn2, NameNode claims dn2 is slow since it sees 100ms latency to
+   * dn2. Note that NameNode is not ware of pipeline structure in this context
+   * and only sees latency between two DataNodes.
+   * </p>
+   * <p>
+   * In another case of the same pipeline, 100ms latency from dn0 to dn1, 5ms
+   * from dn1 to dn2, NameNode will miss detecting dn1 being slow since it's not
+   * the last node. However the assumption is that in a busy enough cluster
+   * there are many other pipelines where dn1 is the last node, e.g. [dn3, dn4,
+   * dn1]. Also our tracking interval is relatively long enough (at least an
+   * hour) to improve the chances of the bad DataNodes being the last nodes in
+   * multiple pipelines.
+   * </p>
+   */
+  private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
+    if (isPenultimateNode && mirrorAddr != null) {
+      datanode.getPeerMetrics().addSendPacketDownstream(
+          bracketedMirrorAddr,
+          elapsedMs);
+    }
+  }
+
   private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {
     return Arrays.copyOfRange(array, end - size, end);
   }
@@ -886,7 +916,7 @@ class BlockReceiver implements Closeable {
     ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
         .getRestartOOBStatus());
   }
-  
+
   void receiveBlock(
       DataOutputStream mirrOut, // output to next datanode
       DataInputStream mirrIn,   // input from next datanode
@@ -895,14 +925,16 @@ class BlockReceiver implements Closeable {
       DatanodeInfo[] downstreams,
       boolean isReplaceBlock) throws IOException {
 
-      syncOnClose = datanode.getDnConf().syncOnClose;
-      boolean responderClosed = false;
-      mirrorOut = mirrOut;
-      mirrorAddr = mirrAddr;
-      throttler = throttlerArg;
+    syncOnClose = datanode.getDnConf().syncOnClose;
+    boolean responderClosed = false;
+    mirrorOut = mirrOut;
+    mirrorAddr = mirrAddr;
+    bracketedMirrorAddr = "[" + mirrAddr + "]";
+    isPenultimateNode = ((downstreams != null) && (downstreams.length == 1));
+    throttler = throttlerArg;
 
-      this.replyOut = replyOut;
-      this.isReplaceBlock = isReplaceBlock;
+    this.replyOut = replyOut;
+    this.isReplaceBlock = isReplaceBlock;
 
     try {
       if (isClient && !isTransfer) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a94c4b1..4436e58 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -165,6 +165,7 @@ import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
 import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
 import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
 import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
@@ -333,6 +334,7 @@ public class DataNode extends ReconfigurableBase
   private int infoSecurePort;
 
   DataNodeMetrics metrics;
+  private DataNodePeerMetrics peerMetrics;
   private InetSocketAddress streamingAddr;
   
   // See the note below in incrDatanodeNetworkErrors re: concurrency.
@@ -1360,6 +1362,7 @@ public class DataNode extends ReconfigurableBase
     initIpcServer();
 
     metrics = DataNodeMetrics.create(getConf(), getDisplayName());
+    peerMetrics = DataNodePeerMetrics.create(getConf(), getDisplayName());
     metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
 
     ecWorker = new ErasureCodingWorker(getConf(), this);
@@ -1755,11 +1758,15 @@ public class DataNode extends ReconfigurableBase
       throw new IOException(ie.getMessage());
     }
   }
-    
+
   public DataNodeMetrics getMetrics() {
     return metrics;
   }
   
+  public DataNodePeerMetrics getPeerMetrics() {
+    return peerMetrics;
+  }
+
   /** Ensure the authentication method is kerberos */
   private void checkKerberosAuthMethod(String msg) throws IOException {
     // User invoking the call must be same as the datanode user
@@ -3437,4 +3444,9 @@ public class DataNode extends ReconfigurableBase
   void setBlockScanner(BlockScanner blockScanner) {
     this.blockScanner = blockScanner;
   }
+
+  @Override // DataNodeMXBean
+  public String getSendPacketDownstreamAvgInfo() {
+    return peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
index 37f9635..ccc5f92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
@@ -125,4 +125,16 @@ public interface DataNodeMXBean {
    * Gets the {@link FileIoProvider} statistics.
    */
   String getFileIoProviderStatistics();
+
+  /**
+   * Gets the average info (e.g. time) of SendPacketDownstream when the DataNode
+   * acts as the penultimate (2nd to the last) node in pipeline.
+   * <p>
+   * Example Json:
+   * {"[185.164.159.81:9801]RollingAvgTime":504.867,
+   *  "[49.236.149.246:9801]RollingAvgTime":504.463,
+   *  "[84.125.113.65:9801]RollingAvgTime":497.954}
+   * </p>
+   */
+  String getSendPacketDownstreamAvgInfo();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index a35a5b4..abcaa4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -323,6 +323,7 @@ class DataXceiver extends Receiver implements Runnable {
         LOG.error(s, t);
       }
     } finally {
+      collectThreadLocalStates();
       if (LOG.isDebugEnabled()) {
         LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "
             + datanode.getXceiverCount());
@@ -335,6 +336,14 @@ class DataXceiver extends Receiver implements Runnable {
     }
   }
 
+  /**
+   * In this short living thread, any local states should be collected before
+   * the thread dies away.
+   */
+  private void collectThreadLocalStates() {
+    datanode.getPeerMetrics().collectThreadLocalStates();
+  }
+
   @Override
   public void requestShortCircuitFds(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> token,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
new file mode 100644
index 0000000..9344d1b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.metrics2.MetricsJsonBuilder;
+import org.apache.hadoop.metrics2.lib.RollingAverages;
+
+/**
+ * This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
+ * various peer operations.
+ */
+@InterfaceAudience.Private
+public class DataNodePeerMetrics {
+
+  static final Log LOG = LogFactory.getLog(DataNodePeerMetrics.class);
+
+  private final RollingAverages sendPacketDownstreamRollingAvgerages;
+
+  private final String name;
+  private final boolean peerStatsEnabled;
+
+  public DataNodePeerMetrics(
+      final String name,
+      final int windowSize,
+      final int numWindows,
+      final boolean peerStatsEnabled) {
+    this.name = name;
+    this.peerStatsEnabled = peerStatsEnabled;
+    sendPacketDownstreamRollingAvgerages = new RollingAverages(
+        windowSize,
+        numWindows);
+  }
+
+  public String name() {
+    return name;
+  }
+
+  /**
+   * Creates an instance of DataNodePeerMetrics, used for registration.
+   */
+  public static DataNodePeerMetrics create(Configuration conf, String dnName) {
+    final String name = "DataNodePeerActivity-" + (dnName.isEmpty()
+        ? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt()
+        : dnName.replace(':', '-'));
+
+    final int windowSize = conf.getInt(
+            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
+            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT);
+    final int numWindows = conf.getInt(
+        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
+        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT);
+    final boolean peerStatsEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+
+    return new DataNodePeerMetrics(
+        name,
+        windowSize,
+        numWindows,
+        peerStatsEnabled);
+  }
+
+  /**
+   * Adds invocation and elapsed time of SendPacketDownstream for peer.
+   * <p>
+   * The caller should pass in a well-formatted peerAddr. e.g.
+   * "[192.168.1.110:1010]" is good. This will be translated into a full
+   * qualified metric name, e.g. "[192.168.1.110:1010]AvgTime".
+   * </p>
+   */
+  public void addSendPacketDownstream(
+      final String peerAddr,
+      final long elapsedMs) {
+    if (peerStatsEnabled) {
+      sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
+    }
+  }
+
+  /**
+   * Dump SendPacketDownstreamRollingAvgTime metrics as JSON.
+   */
+  public String dumpSendPacketDownstreamAvgInfoAsJson() {
+    final MetricsJsonBuilder builder = new MetricsJsonBuilder(null);
+    sendPacketDownstreamRollingAvgerages.snapshot(builder, true);
+    return builder.toString();
+  }
+
+  /**
+   * Collects states maintained in {@link ThreadLocal}, if any.
+   */
+  public void collectThreadLocalStates() {
+    sendPacketDownstreamRollingAvgerages.collectThreadLocalStates();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 086f667..3389d84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1972,6 +1972,31 @@
 </property>
 
 <property>
+  <name>dfs.datanode.peer.stats.enabled</name>
+  <value>false</value>
+  <description>
+    A switch to turn on/off tracking DataNode peer statistics.
+  </description>
+</property>
+
+<property>
+  <name>dfs.metrics.rolling.average.window.size</name>
+  <value>3600</value>
+  <description>
+    The number of seconds of each window for which sub set of samples are gathered
+    to compute the rolling average, A.K.A. roll over interval.
+  </description>
+</property>
+
+<property>
+  <name>dfs.metrics.rolling.average.window.numbers</name>
+  <value>48</value>
+  <description>
+    The number of windows maintained to compute the rolling average.
+  </description>
+</property>
+
+<property>
   <name>hadoop.user.group.metrics.percentiles.intervals</name>
   <value></value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e902965/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
new file mode 100644
index 0000000..5af54a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
+
+/**
+ * This class tests various cases of DataNode peer metrics.
+ */
+public class TestDataNodePeerMetrics {
+
+  @Test(timeout = 30000)
+  public void testGetSendPacketDownstreamAvgInfo() throws Exception {
+    final int windowSize = 5; // 5s roll over interval
+    final int numWindows = 2; // 2 rolling windows
+    final int iterations = 3;
+    final int numOpsPerIteration = 1000;
+
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
+        windowSize);
+    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
+        numWindows);
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
+
+    final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create(
+        conf,
+        "Sample-DataNode");
+    final long start = Time.monotonicNow();
+    for (int i = 1; i <= iterations; i++) {
+      final String peerAddr = genPeerAddress();
+      for (int j = 1; j <= numOpsPerIteration; j++) {
+        /* simulate to get latency of 1 to 1000 ms */
+        final long latency = ThreadLocalRandom.current().nextLong(1, 1000);
+        peerMetrics.addSendPacketDownstream(peerAddr, latency);
+      }
+
+      /**
+       * Sleep until 1s after the next windowSize seconds interval, to let the
+       * metrics roll over
+       */
+      final long sleep = (start + (windowSize * 1000 * i) + 1000)
+          - Time.monotonicNow();
+      Thread.sleep(sleep);
+
+      /* dump avg info */
+      final String json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+
+      /*
+       * example json:
+       * {"[185.164.159.81:9801]RollingAvgTime":504.867,
+       *  "[49.236.149.246:9801]RollingAvgTime":504.463,
+       *  "[84.125.113.65:9801]RollingAvgTime":497.954}
+       */
+      assertThat(json, containsString(peerAddr));
+    }
+  }
+
+  /**
+   * Simulates to generate different peer addresses, e.g. [84.125.113.65:9801].
+   */
+  private String genPeerAddress() {
+    final  ThreadLocalRandom r = ThreadLocalRandom.current();
+    return String.format("[%d.%d.%d.%d:9801]",
+        r.nextInt(1, 256), r.nextInt(1, 256),
+        r.nextInt(1, 256), r.nextInt(1, 256));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org