You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/08/23 06:20:12 UTC

[iotdb] branch rel/1.2 updated: [To rel/1.2][Metric] Fix flush point statistics (#10934)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new 264f908bf99 [To rel/1.2][Metric] Fix flush point statistics (#10934)
264f908bf99 is described below

commit 264f908bf99342b442878a6cf8e844da628a4ad3
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Wed Aug 23 14:20:06 2023 +0800

    [To rel/1.2][Metric] Fix flush point statistics (#10934)
---
 .../dataregion/flush/MemTableFlushTask.java        | 58 +++++++++++++++-------
 .../iotdb/metrics/AbstractMetricService.java       | 45 +++++++++--------
 .../metrics/reporter/iotdb/IoTDBReporter.java      |  6 +--
 3 files changed, 69 insertions(+), 40 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 9a5db18bb28..2b7ada3ead2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -42,6 +43,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -58,6 +60,8 @@ public class MemTableFlushTask {
       FlushSubTaskPoolManager.getInstance();
   private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();
   private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  /* storage group name -> last time */
+  private static final Map<String, Long> flushPointsCache = new ConcurrentHashMap<>();
   private final Future<?> encodingTaskFuture;
   private final Future<?> ioTaskFuture;
   private RestorableTsFileIOWriter writer;
@@ -278,23 +282,7 @@ public class MemTableFlushTask {
             Thread.currentThread().interrupt();
           }
 
-          if (!storageGroup.startsWith(IoTDBConfig.SYSTEM_DATABASE)) {
-            int lastIndex = storageGroup.lastIndexOf("-");
-            if (lastIndex == -1) {
-              lastIndex = storageGroup.length();
-            }
-            MetricService.getInstance()
-                .gaugeWithInternalReportAsync(
-                    memTable.getTotalPointsNum(),
-                    Metric.POINTS.toString(),
-                    MetricLevel.CORE,
-                    Tag.DATABASE.toString(),
-                    storageGroup.substring(0, lastIndex),
-                    Tag.TYPE.toString(),
-                    "flush",
-                    Tag.REGION.toString(),
-                    dataRegionId);
-          }
+          recordFlushPointsMetric();
 
           LOGGER.info(
               "Database {}, flushing memtable {} into disk: Encoding data cost " + "{} ms.",
@@ -305,6 +293,42 @@ public class MemTableFlushTask {
         }
       };
 
+  private void recordFlushPointsMetric() {
+    if (storageGroup.startsWith(IoTDBConfig.SYSTEM_DATABASE)) {
+      return;
+    }
+    int lastIndex = storageGroup.lastIndexOf("-");
+    if (lastIndex == -1) {
+      lastIndex = storageGroup.length();
+    }
+    String storageGroupName = storageGroup.substring(0, lastIndex);
+    long currentTime = DateTimeUtils.currentTime();
+    // compute the flush points
+    long writeTime =
+        flushPointsCache.compute(
+            storageGroupName,
+            (storageGroup, lastTime) -> {
+              if (lastTime == null || lastTime != currentTime) {
+                return currentTime;
+              } else {
+                return currentTime + 1;
+              }
+            });
+    // record the flush points
+    MetricService.getInstance()
+        .gaugeWithInternalReportAsync(
+            memTable.getTotalPointsNum(),
+            Metric.POINTS.toString(),
+            MetricLevel.CORE,
+            writeTime,
+            Tag.DATABASE.toString(),
+            storageGroup.substring(0, lastIndex),
+            Tag.TYPE.toString(),
+            "flush",
+            Tag.REGION.toString(),
+            dataRegionId);
+  }
+
   /** io task (third task of pipeline) */
   @SuppressWarnings("squid:S135")
   private Runnable ioTask =
diff --git a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
index 5ac5a3737f2..06e5632ae6a 100644
--- a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
+++ b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
@@ -271,9 +271,9 @@ public abstract class AbstractMetricService {
 
   /** GetOrCreateCounter with internal report. */
   public Counter getOrCreateCounterWithInternalReport(
-      String metric, MetricLevel metricLevel, String... tags) {
+      String metric, MetricLevel metricLevel, long time, String... tags) {
     Counter counter = metricManager.getOrCreateCounter(metric, metricLevel, tags);
-    internalReporter.writeMetricToIoTDB(counter, metric, tags);
+    internalReporter.writeMetricToIoTDB(counter, metric, time, tags);
     return counter;
   }
 
@@ -287,69 +287,74 @@ public abstract class AbstractMetricService {
 
   /** GetOrCreateGauge with internal report. */
   public Gauge getOrCreateGaugeWithInternalReport(
-      String metric, MetricLevel metricLevel, String... tags) {
+      String metric, MetricLevel metricLevel, long time, String... tags) {
     Gauge gauge = metricManager.getOrCreateGauge(metric, metricLevel, tags);
-    internalReporter.writeMetricToIoTDB(gauge, metric, tags);
+    internalReporter.writeMetricToIoTDB(gauge, metric, time, tags);
     return gauge;
   }
 
   /** GetOrCreateRate with internal report. */
   public Rate getOrCreateRateWithInternalReport(
-      String metric, MetricLevel metricLevel, String... tags) {
+      String metric, MetricLevel metricLevel, long time, String... tags) {
     Rate rate = metricManager.getOrCreateRate(metric, metricLevel, tags);
-    internalReporter.writeMetricToIoTDB(rate, metric, tags);
+    internalReporter.writeMetricToIoTDB(rate, metric, time, tags);
     return rate;
   }
 
   /** GetOrCreateHistogram with internal report. */
   public Histogram getOrCreateHistogramWithInternalReport(
-      String metric, MetricLevel metricLevel, String... tags) {
+      String metric, MetricLevel metricLevel, long time, String... tags) {
     Histogram histogram = metricManager.getOrCreateHistogram(metric, metricLevel, tags);
-    internalReporter.writeMetricToIoTDB(histogram, metric, tags);
+    internalReporter.writeMetricToIoTDB(histogram, metric, time, tags);
     return histogram;
   }
 
   /** GetOrCreateTimer with internal report. */
   public Timer getOrCreateTimerWithInternalReport(
-      String metric, MetricLevel metricLevel, String... tags) {
+      String metric, MetricLevel metricLevel, long time, String... tags) {
     Timer timer = metricManager.getOrCreateTimer(metric, metricLevel, tags);
-    internalReporter.writeMetricToIoTDB(timer, metric, tags);
+    internalReporter.writeMetricToIoTDB(timer, metric, time, tags);
     return timer;
   }
 
   /** Count with internal report. */
   public void countWithInternalReportAsync(
-      long delta, String metric, MetricLevel metricLevel, String... tags) {
+      long delta, String metric, MetricLevel metricLevel, long time, String... tags) {
     internalReporter.writeMetricToIoTDB(
-        metricManager.count(delta, metric, metricLevel, tags), metric, tags);
+        metricManager.count(delta, metric, metricLevel, tags), metric, time, tags);
   }
 
   /** Gauge value with internal report. */
   public void gaugeWithInternalReportAsync(
-      long value, String metric, MetricLevel metricLevel, String... tags) {
+      long value, String metric, MetricLevel metricLevel, long time, String... tags) {
     internalReporter.writeMetricToIoTDB(
-        metricManager.gauge(value, metric, metricLevel, tags), metric, tags);
+        metricManager.gauge(value, metric, metricLevel, tags), metric, time, tags);
   }
 
   /** Rate with internal report. */
   public void rateWithInternalReportAsync(
-      long value, String metric, MetricLevel metricLevel, String... tags) {
+      long value, String metric, MetricLevel metricLevel, long time, String... tags) {
     internalReporter.writeMetricToIoTDB(
-        metricManager.rate(value, metric, metricLevel, tags), metric, tags);
+        metricManager.rate(value, metric, metricLevel, tags), metric, time, tags);
   }
 
   /** Histogram with internal report. */
   public void histogramWithInternalReportAsync(
-      long value, String metric, MetricLevel metricLevel, String... tags) {
+      long value, String metric, MetricLevel metricLevel, long time, String... tags) {
     internalReporter.writeMetricToIoTDB(
-        metricManager.histogram(value, metric, metricLevel, tags), metric, tags);
+        metricManager.histogram(value, metric, metricLevel, tags), metric, time, tags);
   }
 
   /** Timer with internal report. */
   public void timerWithInternalReportAsync(
-      long delta, TimeUnit timeUnit, String metric, MetricLevel metricLevel, String... tags) {
+      long delta,
+      TimeUnit timeUnit,
+      String metric,
+      MetricLevel metricLevel,
+      long time,
+      String... tags) {
     internalReporter.writeMetricToIoTDB(
-        metricManager.timer(delta, timeUnit, metric, metricLevel, tags), metric, tags);
+        metricManager.timer(delta, timeUnit, metric, metricLevel, tags), metric, time, tags);
   }
 
   public List<Pair<String, String[]>> getAllMetricKeys() {
diff --git a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
index 2cc67f13a31..db1e915c8e9 100644
--- a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
+++ b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
@@ -36,14 +36,14 @@ public abstract class IoTDBReporter implements Reporter {
    *
    * @param metric the target metric
    * @param name the name of metric
+   * @param time the target time of metric
    * @param tags the tags of metric
    */
-  public void writeMetricToIoTDB(IMetric metric, String name, String... tags) {
+  public void writeMetricToIoTDB(IMetric metric, String name, long time, String... tags) {
     if (!(metric instanceof DoNothingMetric)) {
       Map<String, Object> values = new HashMap<>();
       metric.constructValueMap(values);
-      writeMetricToIoTDB(
-          values, IoTDBMetricsUtils.generatePath(name, tags), System.currentTimeMillis());
+      writeMetricToIoTDB(values, IoTDBMetricsUtils.generatePath(name, tags), time);
     }
   }