You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/11/18 11:32:39 UTC

[iotdb] 09/09: refactor statistics

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch improvedAlign_for_expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 600450285eb58533f2f7254cb4030b19b5479922
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri Nov 18 19:32:55 2022 +0800

    refactor statistics
---
 .../iotdb/tsfile/write/chunk/TimeChunkWriter.java  |  3 +-
 .../iotdb/tsfile/write/chunk/ValueChunkWriter.java |  3 +-
 .../tsfile/write/monitor/WriteStatistics.java      | 93 ++++++++++++++++++++++
 .../apache/iotdb/tsfile/write/page/PageWriter.java | 81 ++++++++++---------
 .../iotdb/tsfile/write/page/TimePageWriter.java    | 19 ++---
 .../iotdb/tsfile/write/page/ValuePageWriter.java   | 52 +++++++-----
 .../tsfile/write/writer/TimePageWriterTest.java    |  8 +-
 .../tsfile/write/writer/ValuePageWriterTest.java   | 18 +++--
 8 files changed, 197 insertions(+), 80 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
index bd37840d79..39d324037f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@ -94,7 +94,8 @@ public class TimeChunkWriter {
     // init statistics for this chunk and page
     this.statistics = new TimeStatistics();
 
-    this.pageWriter = new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType));
+    this.pageWriter =
+        new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType), measurementId);
   }
 
   public void write(long time) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
index 4ba7cba079..9df5b85d71 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
@@ -98,7 +98,8 @@ public class ValueChunkWriter {
     this.statistics = Statistics.getStatsByType(dataType);
 
     this.pageWriter =
-        new ValuePageWriter(valueEncoder, ICompressor.getCompressor(compressionType), dataType);
+        new ValuePageWriter(
+            valueEncoder, ICompressor.getCompressor(compressionType), dataType, measurementId);
   }
 
   public void write(long time, long value, boolean isNull) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/monitor/WriteStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/monitor/WriteStatistics.java
new file mode 100644
index 0000000000..e50fc0e92e
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/monitor/WriteStatistics.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.monitor;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class WriteStatistics {
+  public static final WriteStatistics INSTANCE = new WriteStatistics();
+  public static final String LABEL_TIME = "time";
+  public static final String LABEL_VALUE = "value";
+
+  private final Map<String, Statistic> statisticMap = new ConcurrentHashMap<>();
+
+  private WriteStatistics() {}
+
+  public void dump(String filePath) throws IOException {
+    try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath))) {
+      for (Entry<String, Statistic> entry : statisticMap.entrySet()) {
+        String label = entry.getKey();
+        Statistic statistic = entry.getValue();
+        writer.write(
+            String.format(
+                "%s,%d,%d,%d",
+                label,
+                statistic.rawSize.get(),
+                statistic.encodedSize.get(),
+                statistic.compressedSize.get()));
+        writer.newLine();
+      }
+    }
+  }
+
+  public void update(String measurementId, long timeSize, long valueSize, StatisticType type) {
+    WriteStatistics.INSTANCE.update(WriteStatistics.LABEL_TIME, timeSize, type);
+    WriteStatistics.INSTANCE.update(WriteStatistics.LABEL_VALUE, valueSize, type);
+    WriteStatistics.INSTANCE.update(
+        measurementId + "-" + WriteStatistics.LABEL_TIME, timeSize, type);
+    WriteStatistics.INSTANCE.update(
+        measurementId + "-" + WriteStatistics.LABEL_VALUE, valueSize, type);
+  }
+
+  public void update(String label, long delta, StatisticType type) {
+    statisticMap.computeIfAbsent(label, l -> new Statistic()).update(delta, type);
+  }
+
+  private static class Statistic {
+    private final AtomicLong rawSize = new AtomicLong();
+    private final AtomicLong encodedSize = new AtomicLong();
+    private final AtomicLong compressedSize = new AtomicLong();
+
+    private void update(long delta, StatisticType type) {
+      switch (type) {
+        case rawSize:
+          rawSize.addAndGet(delta);
+          break;
+        case encodedSize:
+          encodedSize.addAndGet(delta);
+          break;
+        case compressedSize:
+          compressedSize.addAndGet(delta);
+          break;
+      }
+    }
+  }
+
+  public enum StatisticType {
+    rawSize,
+    encodedSize,
+    compressedSize;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
index 097f81edbf..711f20b757 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.write.monitor.WriteStatistics;
+import org.apache.iotdb.tsfile.write.monitor.WriteStatistics.StatisticType;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 import org.slf4j.Logger;
@@ -36,7 +38,6 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This writer is used to write time-value into a page. It consists of a time encoder, a value
@@ -44,15 +45,10 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class PageWriter {
 
-  public static final AtomicLong timeRawSize = new AtomicLong();
-  public static final AtomicLong timeEncodedSize = new AtomicLong();
-  public static final AtomicLong valueRawSize = new AtomicLong();
-  public static final AtomicLong valueEncodedSize = new AtomicLong();
-  public static final AtomicLong compressedSize = new AtomicLong();
-
   private static final Logger logger = LoggerFactory.getLogger(PageWriter.class);
 
   private ICompressor compressor;
+  private String measurementId;
 
   // time
   private Encoder timeEncoder;
@@ -75,6 +71,7 @@ public class PageWriter {
     this(measurementSchema.getTimeEncoder(), measurementSchema.getValueEncoder());
     this.statistics = Statistics.getStatsByType(measurementSchema.getType());
     this.compressor = ICompressor.getCompressor(measurementSchema.getCompressor());
+    this.measurementId = measurementSchema.getMeasurementId();
   }
 
   private PageWriter(Encoder timeEncoder, Encoder valueEncoder) {
@@ -89,8 +86,7 @@ public class PageWriter {
     timeEncoder.encode(time, timeOut);
     valueEncoder.encode(value, valueOut);
     statistics.update(time, value);
-    timeRawSize.addAndGet(Long.BYTES);
-    valueRawSize.addAndGet(1);
+    WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Byte.BYTES, StatisticType.rawSize);
   }
 
   /** write a time value pair into encoder */
@@ -98,8 +94,7 @@ public class PageWriter {
     timeEncoder.encode(time, timeOut);
     valueEncoder.encode(value, valueOut);
     statistics.update(time, value);
-    timeRawSize.addAndGet(Long.BYTES);
-    valueRawSize.addAndGet(Short.BYTES);
+    WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Short.BYTES, StatisticType.rawSize);
   }
 
   /** write a time value pair into encoder */
@@ -107,8 +102,8 @@ public class PageWriter {
     timeEncoder.encode(time, timeOut);
     valueEncoder.encode(value, valueOut);
     statistics.update(time, value);
-    timeRawSize.addAndGet(Long.BYTES);
-    valueRawSize.addAndGet(Integer.BYTES);
+    WriteStatistics.INSTANCE.update(
+        measurementId, Long.BYTES, Integer.BYTES, StatisticType.rawSize);
   }
 
   /** write a time value pair into encoder */
@@ -116,8 +111,7 @@ public class PageWriter {
     timeEncoder.encode(time, timeOut);
     valueEncoder.encode(value, valueOut);
     statistics.update(time, value);
-    timeRawSize.addAndGet(Long.BYTES);
-    valueRawSize.addAndGet(Long.BYTES);
+    WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Long.BYTES, StatisticType.rawSize);
   }
 
   /** write a time value pair into encoder */
@@ -125,8 +119,7 @@ public class PageWriter {
     timeEncoder.encode(time, timeOut);
     valueEncoder.encode(value, valueOut);
     statistics.update(time, value);
-    timeRawSize.addAndGet(Long.BYTES);
-    valueRawSize.addAndGet(Float.BYTES);
+    WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Float.BYTES, StatisticType.rawSize);
   }
 
   /** write a time value pair into encoder */
@@ -134,8 +127,7 @@ public class PageWriter {
     timeEncoder.encode(time, timeOut);
     valueEncoder.encode(value, valueOut);
     statistics.update(time, value);
-    timeRawSize.addAndGet(Long.BYTES);
-    valueRawSize.addAndGet(Double.BYTES);
+    WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Double.BYTES, StatisticType.rawSize);
   }
 
   /** write a time value pair into encoder */
@@ -143,8 +135,8 @@ public class PageWriter {
     timeEncoder.encode(time, timeOut);
     valueEncoder.encode(value, valueOut);
     statistics.update(time, value);
-    timeRawSize.addAndGet(Long.BYTES);
-    valueRawSize.addAndGet(value.getLength());
+    WriteStatistics.INSTANCE.update(
+        measurementId, Long.BYTES, value.getLength(), StatisticType.rawSize);
   }
 
   /** write time series into encoder */
@@ -153,8 +145,8 @@ public class PageWriter {
       timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
-    timeRawSize.addAndGet(Long.BYTES * batchSize);
-    valueRawSize.addAndGet(batchSize);
+    WriteStatistics.INSTANCE.update(
+        measurementId, (long) Long.BYTES * batchSize, batchSize, StatisticType.rawSize);
     statistics.update(timestamps, values, batchSize);
   }
 
@@ -164,8 +156,11 @@ public class PageWriter {
       timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
-    timeRawSize.addAndGet(Long.BYTES * batchSize);
-    valueRawSize.addAndGet(Integer.BYTES * batchSize);
+    WriteStatistics.INSTANCE.update(
+        measurementId,
+        (long) Long.BYTES * batchSize,
+        (long) Integer.BYTES * batchSize,
+        StatisticType.rawSize);
     statistics.update(timestamps, values, batchSize);
   }
 
@@ -175,8 +170,11 @@ public class PageWriter {
       timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
-    timeRawSize.addAndGet(Long.BYTES * batchSize);
-    valueRawSize.addAndGet(Long.BYTES * batchSize);
+    WriteStatistics.INSTANCE.update(
+        measurementId,
+        (long) Long.BYTES * batchSize,
+        (long) Long.BYTES * batchSize,
+        StatisticType.rawSize);
     statistics.update(timestamps, values, batchSize);
   }
 
@@ -186,8 +184,11 @@ public class PageWriter {
       timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
-    timeRawSize.addAndGet(Long.BYTES * batchSize);
-    valueRawSize.addAndGet(Float.BYTES * batchSize);
+    WriteStatistics.INSTANCE.update(
+        measurementId,
+        (long) Long.BYTES * batchSize,
+        (long) Float.BYTES * batchSize,
+        StatisticType.rawSize);
     statistics.update(timestamps, values, batchSize);
   }
 
@@ -197,8 +198,11 @@ public class PageWriter {
       timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
-    timeRawSize.addAndGet(Long.BYTES * batchSize);
-    valueRawSize.addAndGet(Double.BYTES * batchSize);
+    WriteStatistics.INSTANCE.update(
+        measurementId,
+        (long) Long.BYTES * batchSize,
+        (long) Double.BYTES * batchSize,
+        StatisticType.rawSize);
     statistics.update(timestamps, values, batchSize);
   }
 
@@ -207,8 +211,8 @@ public class PageWriter {
     for (int i = 0; i < batchSize; i++) {
       timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
-      timeRawSize.addAndGet(Long.BYTES);
-      valueRawSize.addAndGet(values[i].getLength());
+      WriteStatistics.INSTANCE.update(
+          measurementId, Long.BYTES, values[i].getLength(), StatisticType.rawSize);
     }
     statistics.update(timestamps, values, batchSize);
   }
@@ -220,8 +224,8 @@ public class PageWriter {
   }
 
   /**
-   * getUncompressedBytes return data what it has been written in form of <code>
-   * size of time list, time list, value list</code>
+   * getUncompressedBytes return data what it has been written in form of <code> size of time list,
+   * time list, value list</code>
    *
    * @return a new readable ByteBuffer whose position is 0.
    */
@@ -232,8 +236,8 @@ public class PageWriter {
     buffer.put(timeOut.getBuf(), 0, timeOut.size());
     buffer.put(valueOut.getBuf(), 0, valueOut.size());
     buffer.flip();
-    timeEncodedSize.addAndGet(timeOut.size());
-    valueEncodedSize.addAndGet(valueOut.size());
+    WriteStatistics.INSTANCE.update(
+        measurementId, timeOut.size(), valueOut.size(), StatisticType.encodedSize);
     return buffer;
   }
 
@@ -262,7 +266,8 @@ public class PageWriter {
           compressor.compress(
               pageData.array(), pageData.position(), uncompressedSize, compressedBytes);
     }
-    PageWriter.compressedSize.addAndGet(compressedSize);
+    WriteStatistics.INSTANCE.update(
+        measurementId, compressedSize, compressedSize, StatisticType.compressedSize);
 
     // write the page header to IOWriter
     int sizeWithoutStatistic = 0;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java
index 1d1cd8369f..23c1b197bc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.write.monitor.WriteStatistics;
+import org.apache.iotdb.tsfile.write.monitor.WriteStatistics.StatisticType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +34,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This writer is used to write time into a page. It consists of a time encoder and respective
@@ -41,10 +42,8 @@ import java.util.concurrent.atomic.AtomicLong;
 public class TimePageWriter {
 
   private static final Logger logger = LoggerFactory.getLogger(TimePageWriter.class);
-  public static final AtomicLong timeRawSize = new AtomicLong();
-  public static final AtomicLong timeEncodedSize = new AtomicLong();
-  public static final AtomicLong timeCompressedSize = new AtomicLong();
 
+  private final String measurementId;
   private final ICompressor compressor;
 
   // time
@@ -57,18 +56,19 @@ public class TimePageWriter {
    */
   private TimeStatistics statistics;
 
-  public TimePageWriter(Encoder timeEncoder, ICompressor compressor) {
+  public TimePageWriter(Encoder timeEncoder, ICompressor compressor, String measurementId) {
     this.timeOut = new PublicBAOS();
     this.timeEncoder = timeEncoder;
     this.statistics = new TimeStatistics();
     this.compressor = compressor;
+    this.measurementId = measurementId;
   }
 
   /** write a time into encoder */
   public void write(long time) {
     timeEncoder.encode(time, timeOut);
     statistics.update(time);
-    timeRawSize.addAndGet(Long.BYTES);
+    WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, 0, StatisticType.rawSize);
   }
 
   /** write time series into encoder */
@@ -76,7 +76,8 @@ public class TimePageWriter {
     for (int i = 0; i < batchSize; i++) {
       timeEncoder.encode(timestamps[i], timeOut);
     }
-    timeRawSize.addAndGet(batchSize * Long.BYTES);
+    WriteStatistics.INSTANCE.update(
+        measurementId, (long) Long.BYTES * batchSize, 0, StatisticType.rawSize);
     statistics.update(timestamps, batchSize);
   }
 
@@ -96,7 +97,7 @@ public class TimePageWriter {
     ByteBuffer buffer = ByteBuffer.allocate(timeOut.size());
     buffer.put(timeOut.getBuf(), 0, timeOut.size());
     buffer.flip();
-    timeEncodedSize.addAndGet(timeOut.size());
+    WriteStatistics.INSTANCE.update(measurementId, timeOut.size(), 0, StatisticType.encodedSize);
     return buffer;
   }
 
@@ -125,7 +126,7 @@ public class TimePageWriter {
           compressor.compress(
               pageData.array(), pageData.position(), uncompressedSize, compressedBytes);
     }
-    timeCompressedSize.addAndGet(compressedSize);
+    WriteStatistics.INSTANCE.update(measurementId, compressedSize, 0, StatisticType.compressedSize);
 
     // write the page header to IOWriter
     int sizeWithoutStatistic = 0;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java
index aca4a450ad..b6c252d221 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.write.monitor.WriteStatistics;
+import org.apache.iotdb.tsfile.write.monitor.WriteStatistics.StatisticType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +37,6 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This writer is used to write value into a page. It consists of a value encoder and respective
@@ -43,10 +44,6 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class ValuePageWriter {
 
-  public static final AtomicLong valueRawSize = new AtomicLong();
-  public static final AtomicLong valueEncodedSize = new AtomicLong();
-  public static final AtomicLong valueCompressedSize = new AtomicLong();
-
   private static final Logger logger = LoggerFactory.getLogger(ValuePageWriter.class);
 
   private final ICompressor compressor;
@@ -67,9 +64,11 @@ public class ValuePageWriter {
 
   private final PublicBAOS bitmapOut;
 
+  private String measurementId;
   private static final int MASK = 1 << 7;
 
-  public ValuePageWriter(Encoder valueEncoder, ICompressor compressor, TSDataType dataType) {
+  public ValuePageWriter(
+      Encoder valueEncoder, ICompressor compressor, TSDataType dataType, String measurementId) {
     this.valueOut = new PublicBAOS();
     this.bitmap = 0;
     this.size = 0;
@@ -77,13 +76,15 @@ public class ValuePageWriter {
     this.valueEncoder = valueEncoder;
     this.statistics = Statistics.getStatsByType(dataType);
     this.compressor = compressor;
+    this.measurementId = measurementId;
   }
 
   /** write a time value pair into encoder */
   public void write(long time, boolean value, boolean isNull) {
     setBit(isNull);
     if (!isNull) {
-      valueRawSize.addAndGet(1);
+      WriteStatistics.INSTANCE.update(measurementId, 0, 1, StatisticType.rawSize);
+      WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, 0, StatisticType.encodedSize);
       valueEncoder.encode(value, valueOut);
       statistics.update(time, value);
     }
@@ -93,7 +94,7 @@ public class ValuePageWriter {
   public void write(long time, short value, boolean isNull) {
     setBit(isNull);
     if (!isNull) {
-      valueRawSize.addAndGet(Short.BYTES);
+      WriteStatistics.INSTANCE.update(measurementId, 0, Short.BYTES, StatisticType.rawSize);
       valueEncoder.encode(value, valueOut);
       statistics.update(time, value);
     }
@@ -103,7 +104,7 @@ public class ValuePageWriter {
   public void write(long time, int value, boolean isNull) {
     setBit(isNull);
     if (!isNull) {
-      valueRawSize.addAndGet(Integer.BYTES);
+      WriteStatistics.INSTANCE.update(measurementId, 0, Integer.BYTES, StatisticType.rawSize);
       valueEncoder.encode(value, valueOut);
       statistics.update(time, value);
     }
@@ -113,7 +114,7 @@ public class ValuePageWriter {
   public void write(long time, long value, boolean isNull) {
     setBit(isNull);
     if (!isNull) {
-      valueRawSize.addAndGet(Long.BYTES);
+      WriteStatistics.INSTANCE.update(measurementId, 0, Long.BYTES, StatisticType.rawSize);
       valueEncoder.encode(value, valueOut);
       statistics.update(time, value);
     }
@@ -123,7 +124,7 @@ public class ValuePageWriter {
   public void write(long time, float value, boolean isNull) {
     setBit(isNull);
     if (!isNull) {
-      valueRawSize.addAndGet(Float.BYTES);
+      WriteStatistics.INSTANCE.update(measurementId, 0, Float.BYTES, StatisticType.rawSize);
       valueEncoder.encode(value, valueOut);
       statistics.update(time, value);
     }
@@ -133,7 +134,7 @@ public class ValuePageWriter {
   public void write(long time, double value, boolean isNull) {
     setBit(isNull);
     if (!isNull) {
-      valueRawSize.addAndGet(Double.BYTES);
+      WriteStatistics.INSTANCE.update(measurementId, 0, Double.BYTES, StatisticType.rawSize);
       valueEncoder.encode(value, valueOut);
       statistics.update(time, value);
     }
@@ -143,7 +144,7 @@ public class ValuePageWriter {
   public void write(long time, Binary value, boolean isNull) {
     setBit(isNull);
     if (!isNull) {
-      valueRawSize.addAndGet(value.getLength());
+      WriteStatistics.INSTANCE.update(measurementId, 0, value.getLength(), StatisticType.rawSize);
       valueEncoder.encode(value, valueOut);
       statistics.update(time, value);
     }
@@ -165,7 +166,7 @@ public class ValuePageWriter {
     for (int i = 0; i < batchSize; i++) {
       valueEncoder.encode(values[i], valueOut);
     }
-    valueRawSize.addAndGet(batchSize);
+    WriteStatistics.INSTANCE.update(measurementId, 0, batchSize, StatisticType.rawSize);
     statistics.update(timestamps, values, batchSize);
   }
 
@@ -174,7 +175,8 @@ public class ValuePageWriter {
     for (int i = 0; i < batchSize; i++) {
       valueEncoder.encode(values[i], valueOut);
     }
-    valueRawSize.addAndGet(Integer.BYTES * batchSize);
+    WriteStatistics.INSTANCE.update(
+        measurementId, 0, Integer.BYTES * batchSize, StatisticType.rawSize);
     statistics.update(timestamps, values, batchSize);
   }
 
@@ -183,7 +185,8 @@ public class ValuePageWriter {
     for (int i = 0; i < batchSize; i++) {
       valueEncoder.encode(values[i], valueOut);
     }
-    valueRawSize.addAndGet(Long.BYTES * batchSize);
+    WriteStatistics.INSTANCE.update(
+        measurementId, 0, (long) Long.BYTES * batchSize, StatisticType.rawSize);
     statistics.update(timestamps, values, batchSize);
   }
 
@@ -192,7 +195,8 @@ public class ValuePageWriter {
     for (int i = 0; i < batchSize; i++) {
       valueEncoder.encode(values[i], valueOut);
     }
-    valueRawSize.addAndGet(Float.BYTES * batchSize);
+    WriteStatistics.INSTANCE.update(
+        measurementId, 0, Float.BYTES * batchSize, StatisticType.rawSize);
     statistics.update(timestamps, values, batchSize);
   }
 
@@ -201,7 +205,8 @@ public class ValuePageWriter {
     for (int i = 0; i < batchSize; i++) {
       valueEncoder.encode(values[i], valueOut);
     }
-    valueRawSize.addAndGet(Double.BYTES * batchSize);
+    WriteStatistics.INSTANCE.update(
+        measurementId, 0, Double.BYTES * batchSize, StatisticType.rawSize);
     statistics.update(timestamps, values, batchSize);
   }
 
@@ -209,7 +214,8 @@ public class ValuePageWriter {
   public void write(long[] timestamps, Binary[] values, int batchSize) {
     for (int i = 0; i < batchSize; i++) {
       valueEncoder.encode(values[i], valueOut);
-      valueRawSize.addAndGet(values[i].getLength());
+      WriteStatistics.INSTANCE.update(
+          measurementId, 0, values[i].getLength(), StatisticType.rawSize);
     }
     statistics.update(timestamps, values, batchSize);
   }
@@ -230,7 +236,11 @@ public class ValuePageWriter {
    */
   public ByteBuffer getUncompressedBytes() throws IOException {
     prepareEndWriteOnePage();
-    valueEncodedSize.addAndGet(Integer.BYTES + bitmapOut.size() + valueOut.size());
+    WriteStatistics.INSTANCE.update(
+        measurementId,
+        0,
+        Integer.BYTES + bitmapOut.size() + valueOut.size(),
+        StatisticType.encodedSize);
     ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES + bitmapOut.size() + valueOut.size());
     buffer.putInt(size);
     buffer.put(bitmapOut.getBuf(), 0, bitmapOut.size());
@@ -271,7 +281,7 @@ public class ValuePageWriter {
           compressor.compress(
               pageData.array(), pageData.position(), uncompressedSize, compressedBytes);
     }
-    valueCompressedSize.addAndGet(compressedSize);
+    WriteStatistics.INSTANCE.update(measurementId, 0, compressedSize, StatisticType.compressedSize);
 
     // write the page header to IOWriter
     int sizeWithoutStatistic = 0;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
index cab975c9ab..a28f4d2619 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
@@ -45,7 +45,7 @@ public class TimePageWriterTest {
   public void testWrite() {
     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
-    TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+    TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor, "");
     try {
       pageWriter.write(1L);
       assertEquals(8, pageWriter.estimateMaxMemSize());
@@ -68,7 +68,7 @@ public class TimePageWriterTest {
   public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() {
     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
-    TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+    TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor, "");
     PublicBAOS publicBAOS = new PublicBAOS();
     try {
       pageWriter.write(1L);
@@ -99,7 +99,7 @@ public class TimePageWriterTest {
   public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() {
     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
-    TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+    TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor, "");
     PublicBAOS publicBAOS = new PublicBAOS();
     try {
       pageWriter.write(1L);
@@ -135,7 +135,7 @@ public class TimePageWriterTest {
   public void testWritePageHeaderAndDataIntoBuffWithSnappy() {
     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
     ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY);
-    TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+    TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor, "");
     PublicBAOS publicBAOS = new PublicBAOS();
     try {
       pageWriter.write(1L);
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java
index a43159fba1..fc6cc272fa 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java
@@ -46,7 +46,8 @@ public class ValuePageWriterTest {
   public void testWrite1() {
     Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
-    ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+    ValuePageWriter pageWriter =
+        new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, "");
     try {
       pageWriter.write(1L, 1.0f, false);
       assertEquals(9, pageWriter.estimateMaxMemSize());
@@ -69,7 +70,8 @@ public class ValuePageWriterTest {
   public void testWrite2() {
     Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
-    ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+    ValuePageWriter pageWriter =
+        new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, "");
     try {
       for (int time = 1; time <= 16; time++) {
         pageWriter.write(time, (float) time, time % 4 == 0);
@@ -99,7 +101,8 @@ public class ValuePageWriterTest {
   public void testWrite3() {
     Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
-    ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+    ValuePageWriter pageWriter =
+        new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, "");
     try {
       for (int time = 1; time <= 20; time++) {
         pageWriter.write(time, (float) time, time % 4 == 0);
@@ -130,7 +133,8 @@ public class ValuePageWriterTest {
   public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() {
     Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
-    ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+    ValuePageWriter pageWriter =
+        new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, "");
     PublicBAOS publicBAOS = new PublicBAOS();
     try {
       for (int time = 1; time <= 20; time++) {
@@ -178,7 +182,8 @@ public class ValuePageWriterTest {
   public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() {
     Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
-    ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+    ValuePageWriter pageWriter =
+        new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, "");
     PublicBAOS publicBAOS = new PublicBAOS();
     try {
       for (int time = 1; time <= 20; time++) {
@@ -237,7 +242,8 @@ public class ValuePageWriterTest {
   public void testWritePageHeaderAndDataIntoBuffWithSnappy() {
     Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
     ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY);
-    ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+    ValuePageWriter pageWriter =
+        new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, "");
     PublicBAOS publicBAOS = new PublicBAOS();
     try {
       for (int time = 1; time <= 20; time++) {