You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2021/01/16 06:07:39 UTC

[hadoop] branch trunk updated: HADOOP-16947. Stale record should be remove when MutableRollingAverages generating aggregate data. Contributed by Haibin Huang.

This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 97f843d  HADOOP-16947. Stale record should be remove when MutableRollingAverages generating aggregate data. Contributed by Haibin Huang.
97f843d is described below

commit 97f843de3a9e86159be5f2bb0cdf6d1ffa0af71d
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Sat Jan 16 14:06:56 2021 +0800

    HADOOP-16947. Stale record should be remove when MutableRollingAverages generating aggregate data. Contributed by Haibin Huang.
---
 .../metrics2/lib/MutableRatesWithAggregation.java  |  1 +
 .../metrics2/lib/MutableRollingAverages.java       | 50 +++++++++++++++--
 .../apache/hadoop/metrics2/lib/MutableStat.java    | 22 +++++++-
 .../server/datanode/TestDataNodePeerMetrics.java   | 64 +++++++++++++++++++++-
 4 files changed, 129 insertions(+), 8 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
index 2079165..7795343 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
@@ -163,6 +163,7 @@ public class MutableRatesWithAggregation extends MutableMetric {
     MutableRate metric = globalMetrics.get(name);
     if (metric == null) {
       metric = new MutableRate(name + typePrefix, name + typePrefix, false);
+      metric.setUpdateTimeStamp(true);
       globalMetrics.put(name, metric);
     }
     return metric;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java
index e6111e3..1723362 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
 
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.Time;
 
 import javax.annotation.Nullable;
 
@@ -77,13 +78,26 @@ public class MutableRollingAverages extends MutableMetric implements Closeable {
   private final String avgInfoDescTemplate;
   private int numWindows;
 
+  /**
+   * This class maintains sub-sum and sub-total of SampleStat.
+   */
   private static class SumAndCount {
     private final double sum;
     private final long count;
-
-    SumAndCount(final double sum, final long count) {
+    private final long snapshotTimeStamp;
+
+    /**
+     * Constructor for {@link SumAndCount}.
+     *
+     * @param sum sub-sum in sliding windows
+     * @param count sub-total in sliding windows
+     * @param snapshotTimeStamp when is a new SampleStat snapshot.
+     */
+    SumAndCount(final double sum, final long count,
+        final long snapshotTimeStamp) {
       this.sum = sum;
       this.count = count;
+      this.snapshotTimeStamp = snapshotTimeStamp;
     }
 
     public double getSum() {
@@ -93,6 +107,10 @@ public class MutableRollingAverages extends MutableMetric implements Closeable {
     public long getCount() {
       return count;
     }
+
+    public long getSnapshotTimeStamp() {
+      return snapshotTimeStamp;
+    }
   }
 
   /**
@@ -111,6 +129,16 @@ public class MutableRollingAverages extends MutableMetric implements Closeable {
   private static final int NUM_WINDOWS_DEFAULT = 36;
 
   /**
+   * Time duration after which a record is considered stale.
+   * {@link MutableRollingAverages} should be time-sensitive, and it should use
+   * the time window length(i.e. NUM_WINDOWS_DEFAULT * WINDOW_SIZE_MS_DEFAULT)
+   * as the valid time to make sure some too old record won't be use to compute
+   * average.
+   */
+  private long recordValidityMs =
+      NUM_WINDOWS_DEFAULT * WINDOW_SIZE_MS_DEFAULT;
+
+  /**
    * Constructor for {@link MutableRollingAverages}.
    * @param metricValueName
    */
@@ -231,7 +259,8 @@ public class MutableRollingAverages extends MutableMetric implements Closeable {
           });
       final SumAndCount sumAndCount = new SumAndCount(
           rate.lastStat().total(),
-          rate.lastStat().numSamples());
+          rate.lastStat().numSamples(),
+          rate.getSnapshotTimeStamp());
       /* put newest sum and count to the end */
       if (!deque.offerLast(sumAndCount)) {
         deque.pollFirst();
@@ -267,8 +296,11 @@ public class MutableRollingAverages extends MutableMetric implements Closeable {
       long totalCount = 0;
 
       for (final SumAndCount sumAndCount : entry.getValue()) {
-        totalCount += sumAndCount.getCount();
-        totalSum += sumAndCount.getSum();
+        if (Time.monotonicNow() - sumAndCount.getSnapshotTimeStamp()
+            < recordValidityMs) {
+          totalCount += sumAndCount.getCount();
+          totalSum += sumAndCount.getSum();
+        }
       }
 
       if (totalCount > minSamples) {
@@ -277,4 +309,12 @@ public class MutableRollingAverages extends MutableMetric implements Closeable {
     }
     return stats;
   }
+
+  /**
+   * Use for test only.
+   */
+  @VisibleForTesting
+  public synchronized void setRecordValidityMs(long value) {
+    this.recordValidityMs = value;
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java
index 5ef3178..e04b4b5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.metrics2.MetricsInfo;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.util.SampleStat;
+import org.apache.hadoop.util.Time;
+
 import static org.apache.hadoop.metrics2.lib.Interns.*;
 
 /**
@@ -47,7 +49,9 @@ public class MutableStat extends MutableMetric {
   private final SampleStat prevStat = new SampleStat();
   private final SampleStat.MinMax minMax = new SampleStat.MinMax();
   private long numSamples = 0;
+  private long snapshotTimeStamp = 0;
   private boolean extended = false;
+  private boolean updateTimeStamp = false;
 
   /**
    * Construct a sample statistics metric
@@ -101,6 +105,13 @@ public class MutableStat extends MutableMetric {
   }
 
   /**
+   * Set whether to update the snapshot time or not.
+   * @param updateTimeStamp enable update stats snapshot timestamp
+   */
+  public synchronized void setUpdateTimeStamp(boolean updateTimeStamp) {
+    this.updateTimeStamp = updateTimeStamp;
+  }
+  /**
    * Add a number of samples and their sum to the running stat
    *
    * Note that although use of this method will preserve accurate mean values,
@@ -115,7 +126,7 @@ public class MutableStat extends MutableMetric {
   }
 
   /**
-   * Add a snapshot to the metric
+   * Add a snapshot to the metric.
    * @param value of the metric
    */
   public synchronized void add(long value) {
@@ -142,6 +153,9 @@ public class MutableStat extends MutableMetric {
         if (numSamples > 0) {
           intervalStat.copyTo(prevStat);
           intervalStat.reset();
+          if (updateTimeStamp) {
+            snapshotTimeStamp = Time.monotonicNow();
+          }
         }
         clearChanged();
       }
@@ -164,6 +178,12 @@ public class MutableStat extends MutableMetric {
     minMax.reset();
   }
 
+  /**
+   * Return the SampleStat snapshot timestamp
+   */
+  public long getSnapshotTimeStamp() {
+    return snapshotTimeStamp;
+  }
   @Override
   public String toString() {
     return lastStat().toString();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
index 3caf24d..41fb41f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
@@ -17,17 +17,24 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
 import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
+import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -43,7 +50,7 @@ public class TestDataNodePeerMetrics {
     final int numOpsPerIteration = 1000;
 
     final Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
+    conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
 
     final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create(
         "Sample-DataNode", conf);
@@ -80,6 +87,59 @@ public class TestDataNodePeerMetrics {
     }
   }
 
+  @Test(timeout = 30000)
+  public void testRemoveStaleRecord() throws Exception {
+    final int numWindows = 5;
+    final long scheduleInterval = 1000;
+    final int iterations = 3;
+    final int numSamples = 100;
+
+    final Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY,
+        numSamples);
+    conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
+
+    final DataNodePeerMetrics peerMetrics =
+        DataNodePeerMetrics.create("Sample-DataNode", conf);
+    MutableRollingAverages rollingAverages =
+        peerMetrics.getSendPacketDownstreamRollingAverages();
+    rollingAverages.setRecordValidityMs(numWindows * scheduleInterval);
+    MetricsTestHelper.replaceRollingAveragesScheduler(rollingAverages,
+        numWindows, scheduleInterval, TimeUnit.MILLISECONDS);
+
+    List<String> peerAddrList = new ArrayList<>();
+    for (int i = 1; i <= iterations; i++) {
+      peerAddrList.add(genPeerAddress());
+    }
+    for (String peerAddr : peerAddrList) {
+      for (int j = 1; j <= numSamples; j++) {
+        /* simulate to get latency of 1 to 1000 ms */
+        final long latency = ThreadLocalRandom.current().nextLong(1, 1000);
+        peerMetrics.addSendPacketDownstream(peerAddr, latency);
+      }
+    }
+
+    GenericTestUtils.waitFor(
+        () -> rollingAverages.getStats(numSamples).size() > 0, 500, 5000);
+    assertEquals(3, rollingAverages.getStats(numSamples).size());
+    /* wait for stale report to be removed */
+    GenericTestUtils.waitFor(
+        () -> rollingAverages.getStats(numSamples).isEmpty(), 500, 10000);
+    assertEquals(0, rollingAverages.getStats(numSamples).size());
+
+    /* dn can report peer metrics normally when it added back to cluster */
+    for (String peerAddr : peerAddrList) {
+      for (int j = 1; j <= numSamples; j++) {
+        /* simulate to get latency of 1 to 1000 ms */
+        final long latency = ThreadLocalRandom.current().nextLong(1, 1000);
+        peerMetrics.addSendPacketDownstream(peerAddr, latency);
+      }
+    }
+    GenericTestUtils.waitFor(
+        () -> rollingAverages.getStats(numSamples).size() > 0, 500, 10000);
+    assertEquals(3, rollingAverages.getStats(numSamples).size());
+  }
+
   /**
    * Simulates to generate different peer addresses, e.g. [84.125.113.65:9801].
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org