You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/08/23 06:20:12 UTC
[iotdb] branch rel/1.2 updated: [To rel/1.2][Metric] Fix flush point statistics (#10934)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 264f908bf99 [To rel/1.2][Metric] Fix flush point statistics (#10934)
264f908bf99 is described below
commit 264f908bf99342b442878a6cf8e844da628a4ad3
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Wed Aug 23 14:20:06 2023 +0800
[To rel/1.2][Metric] Fix flush point statistics (#10934)
---
.../dataregion/flush/MemTableFlushTask.java | 58 +++++++++++++++-------
.../iotdb/metrics/AbstractMetricService.java | 45 +++++++++--------
.../metrics/reporter/iotdb/IoTDBReporter.java | 6 +--
3 files changed, 69 insertions(+), 40 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 9a5db18bb28..2b7ada3ead2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -42,6 +43,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -58,6 +60,8 @@ public class MemTableFlushTask {
FlushSubTaskPoolManager.getInstance();
private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ /* storage group name -> last time */
+ private static final Map<String, Long> flushPointsCache = new ConcurrentHashMap<>();
private final Future<?> encodingTaskFuture;
private final Future<?> ioTaskFuture;
private RestorableTsFileIOWriter writer;
@@ -278,23 +282,7 @@ public class MemTableFlushTask {
Thread.currentThread().interrupt();
}
- if (!storageGroup.startsWith(IoTDBConfig.SYSTEM_DATABASE)) {
- int lastIndex = storageGroup.lastIndexOf("-");
- if (lastIndex == -1) {
- lastIndex = storageGroup.length();
- }
- MetricService.getInstance()
- .gaugeWithInternalReportAsync(
- memTable.getTotalPointsNum(),
- Metric.POINTS.toString(),
- MetricLevel.CORE,
- Tag.DATABASE.toString(),
- storageGroup.substring(0, lastIndex),
- Tag.TYPE.toString(),
- "flush",
- Tag.REGION.toString(),
- dataRegionId);
- }
+ recordFlushPointsMetric();
LOGGER.info(
"Database {}, flushing memtable {} into disk: Encoding data cost " + "{} ms.",
@@ -305,6 +293,42 @@ public class MemTableFlushTask {
}
};
+ private void recordFlushPointsMetric() {
+ if (storageGroup.startsWith(IoTDBConfig.SYSTEM_DATABASE)) {
+ return;
+ }
+ int lastIndex = storageGroup.lastIndexOf("-");
+ if (lastIndex == -1) {
+ lastIndex = storageGroup.length();
+ }
+ String storageGroupName = storageGroup.substring(0, lastIndex);
+ long currentTime = DateTimeUtils.currentTime();
+ // compute the flush points
+ long writeTime =
+ flushPointsCache.compute(
+ storageGroupName,
+ (storageGroup, lastTime) -> {
+ if (lastTime == null || lastTime != currentTime) {
+ return currentTime;
+ } else {
+ return currentTime + 1;
+ }
+ });
+ // record the flush points
+ MetricService.getInstance()
+ .gaugeWithInternalReportAsync(
+ memTable.getTotalPointsNum(),
+ Metric.POINTS.toString(),
+ MetricLevel.CORE,
+ writeTime,
+ Tag.DATABASE.toString(),
+ storageGroup.substring(0, lastIndex),
+ Tag.TYPE.toString(),
+ "flush",
+ Tag.REGION.toString(),
+ dataRegionId);
+ }
+
/** io task (third task of pipeline) */
@SuppressWarnings("squid:S135")
private Runnable ioTask =
diff --git a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
index 5ac5a3737f2..06e5632ae6a 100644
--- a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
+++ b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
@@ -271,9 +271,9 @@ public abstract class AbstractMetricService {
/** GetOrCreateCounter with internal report. */
public Counter getOrCreateCounterWithInternalReport(
- String metric, MetricLevel metricLevel, String... tags) {
+ String metric, MetricLevel metricLevel, long time, String... tags) {
Counter counter = metricManager.getOrCreateCounter(metric, metricLevel, tags);
- internalReporter.writeMetricToIoTDB(counter, metric, tags);
+ internalReporter.writeMetricToIoTDB(counter, metric, time, tags);
return counter;
}
@@ -287,69 +287,74 @@ public abstract class AbstractMetricService {
/** GetOrCreateGauge with internal report. */
public Gauge getOrCreateGaugeWithInternalReport(
- String metric, MetricLevel metricLevel, String... tags) {
+ String metric, MetricLevel metricLevel, long time, String... tags) {
Gauge gauge = metricManager.getOrCreateGauge(metric, metricLevel, tags);
- internalReporter.writeMetricToIoTDB(gauge, metric, tags);
+ internalReporter.writeMetricToIoTDB(gauge, metric, time, tags);
return gauge;
}
/** GetOrCreateRate with internal report. */
public Rate getOrCreateRateWithInternalReport(
- String metric, MetricLevel metricLevel, String... tags) {
+ String metric, MetricLevel metricLevel, long time, String... tags) {
Rate rate = metricManager.getOrCreateRate(metric, metricLevel, tags);
- internalReporter.writeMetricToIoTDB(rate, metric, tags);
+ internalReporter.writeMetricToIoTDB(rate, metric, time, tags);
return rate;
}
/** GetOrCreateHistogram with internal report. */
public Histogram getOrCreateHistogramWithInternalReport(
- String metric, MetricLevel metricLevel, String... tags) {
+ String metric, MetricLevel metricLevel, long time, String... tags) {
Histogram histogram = metricManager.getOrCreateHistogram(metric, metricLevel, tags);
- internalReporter.writeMetricToIoTDB(histogram, metric, tags);
+ internalReporter.writeMetricToIoTDB(histogram, metric, time, tags);
return histogram;
}
/** GetOrCreateTimer with internal report. */
public Timer getOrCreateTimerWithInternalReport(
- String metric, MetricLevel metricLevel, String... tags) {
+ String metric, MetricLevel metricLevel, long time, String... tags) {
Timer timer = metricManager.getOrCreateTimer(metric, metricLevel, tags);
- internalReporter.writeMetricToIoTDB(timer, metric, tags);
+ internalReporter.writeMetricToIoTDB(timer, metric, time, tags);
return timer;
}
/** Count with internal report. */
public void countWithInternalReportAsync(
- long delta, String metric, MetricLevel metricLevel, String... tags) {
+ long delta, String metric, MetricLevel metricLevel, long time, String... tags) {
internalReporter.writeMetricToIoTDB(
- metricManager.count(delta, metric, metricLevel, tags), metric, tags);
+ metricManager.count(delta, metric, metricLevel, tags), metric, time, tags);
}
/** Gauge value with internal report. */
public void gaugeWithInternalReportAsync(
- long value, String metric, MetricLevel metricLevel, String... tags) {
+ long value, String metric, MetricLevel metricLevel, long time, String... tags) {
internalReporter.writeMetricToIoTDB(
- metricManager.gauge(value, metric, metricLevel, tags), metric, tags);
+ metricManager.gauge(value, metric, metricLevel, tags), metric, time, tags);
}
/** Rate with internal report. */
public void rateWithInternalReportAsync(
- long value, String metric, MetricLevel metricLevel, String... tags) {
+ long value, String metric, MetricLevel metricLevel, long time, String... tags) {
internalReporter.writeMetricToIoTDB(
- metricManager.rate(value, metric, metricLevel, tags), metric, tags);
+ metricManager.rate(value, metric, metricLevel, tags), metric, time, tags);
}
/** Histogram with internal report. */
public void histogramWithInternalReportAsync(
- long value, String metric, MetricLevel metricLevel, String... tags) {
+ long value, String metric, MetricLevel metricLevel, long time, String... tags) {
internalReporter.writeMetricToIoTDB(
- metricManager.histogram(value, metric, metricLevel, tags), metric, tags);
+ metricManager.histogram(value, metric, metricLevel, tags), metric, time, tags);
}
/** Timer with internal report. */
public void timerWithInternalReportAsync(
- long delta, TimeUnit timeUnit, String metric, MetricLevel metricLevel, String... tags) {
+ long delta,
+ TimeUnit timeUnit,
+ String metric,
+ MetricLevel metricLevel,
+ long time,
+ String... tags) {
internalReporter.writeMetricToIoTDB(
- metricManager.timer(delta, timeUnit, metric, metricLevel, tags), metric, tags);
+ metricManager.timer(delta, timeUnit, metric, metricLevel, tags), metric, time, tags);
}
public List<Pair<String, String[]>> getAllMetricKeys() {
diff --git a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
index 2cc67f13a31..db1e915c8e9 100644
--- a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
+++ b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
@@ -36,14 +36,14 @@ public abstract class IoTDBReporter implements Reporter {
*
* @param metric the target metric
* @param name the name of metric
+ * @param time the target time of metric
* @param tags the tags of metric
*/
- public void writeMetricToIoTDB(IMetric metric, String name, String... tags) {
+ public void writeMetricToIoTDB(IMetric metric, String name, long time, String... tags) {
if (!(metric instanceof DoNothingMetric)) {
Map<String, Object> values = new HashMap<>();
metric.constructValueMap(values);
- writeMetricToIoTDB(
- values, IoTDBMetricsUtils.generatePath(name, tags), System.currentTimeMillis());
+ writeMetricToIoTDB(values, IoTDBMetricsUtils.generatePath(name, tags), time);
}
}