You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/11/18 11:32:39 UTC
[iotdb] 09/09: refactor statistics
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch improvedAlign_for_expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 600450285eb58533f2f7254cb4030b19b5479922
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri Nov 18 19:32:55 2022 +0800
refactor statistics
---
.../iotdb/tsfile/write/chunk/TimeChunkWriter.java | 3 +-
.../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 3 +-
.../tsfile/write/monitor/WriteStatistics.java | 93 ++++++++++++++++++++++
.../apache/iotdb/tsfile/write/page/PageWriter.java | 81 ++++++++++---------
.../iotdb/tsfile/write/page/TimePageWriter.java | 19 ++---
.../iotdb/tsfile/write/page/ValuePageWriter.java | 52 +++++++-----
.../tsfile/write/writer/TimePageWriterTest.java | 8 +-
.../tsfile/write/writer/ValuePageWriterTest.java | 18 +++--
8 files changed, 197 insertions(+), 80 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
index bd37840d79..39d324037f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@ -94,7 +94,8 @@ public class TimeChunkWriter {
// init statistics for this chunk and page
this.statistics = new TimeStatistics();
- this.pageWriter = new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType));
+ this.pageWriter =
+ new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType), measurementId);
}
public void write(long time) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
index 4ba7cba079..9df5b85d71 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
@@ -98,7 +98,8 @@ public class ValueChunkWriter {
this.statistics = Statistics.getStatsByType(dataType);
this.pageWriter =
- new ValuePageWriter(valueEncoder, ICompressor.getCompressor(compressionType), dataType);
+ new ValuePageWriter(
+ valueEncoder, ICompressor.getCompressor(compressionType), dataType, measurementId);
}
public void write(long time, long value, boolean isNull) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/monitor/WriteStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/monitor/WriteStatistics.java
new file mode 100644
index 0000000000..e50fc0e92e
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/monitor/WriteStatistics.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.monitor;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class WriteStatistics {
+ public static final WriteStatistics INSTANCE = new WriteStatistics();
+ public static final String LABEL_TIME = "time";
+ public static final String LABEL_VALUE = "value";
+
+ private final Map<String, Statistic> statisticMap = new ConcurrentHashMap<>();
+
+ private WriteStatistics() {}
+
+ public void dump(String filePath) throws IOException {
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath))) {
+ for (Entry<String, Statistic> entry : statisticMap.entrySet()) {
+ String label = entry.getKey();
+ Statistic statistic = entry.getValue();
+ writer.write(
+ String.format(
+ "%s,%d,%d,%d",
+ label,
+ statistic.rawSize.get(),
+ statistic.encodedSize.get(),
+ statistic.compressedSize.get()));
+ writer.newLine();
+ }
+ }
+ }
+
+ public void update(String measurementId, long timeSize, long valueSize, StatisticType type) {
+ WriteStatistics.INSTANCE.update(WriteStatistics.LABEL_TIME, timeSize, type);
+ WriteStatistics.INSTANCE.update(WriteStatistics.LABEL_VALUE, valueSize, type);
+ WriteStatistics.INSTANCE.update(
+ measurementId + "-" + WriteStatistics.LABEL_TIME, timeSize, type);
+ WriteStatistics.INSTANCE.update(
+ measurementId + "-" + WriteStatistics.LABEL_VALUE, valueSize, type);
+ }
+
+ public void update(String label, long delta, StatisticType type) {
+ statisticMap.computeIfAbsent(label, l -> new Statistic()).update(delta, type);
+ }
+
+ private static class Statistic {
+ private final AtomicLong rawSize = new AtomicLong();
+ private final AtomicLong encodedSize = new AtomicLong();
+ private final AtomicLong compressedSize = new AtomicLong();
+
+ private void update(long delta, StatisticType type) {
+ switch (type) {
+ case rawSize:
+ rawSize.addAndGet(delta);
+ break;
+ case encodedSize:
+ encodedSize.addAndGet(delta);
+ break;
+ case compressedSize:
+ compressedSize.addAndGet(delta);
+ break;
+ }
+ }
+ }
+
+ public enum StatisticType {
+ rawSize,
+ encodedSize,
+ compressedSize;
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
index 097f81edbf..711f20b757 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.write.monitor.WriteStatistics;
+import org.apache.iotdb.tsfile.write.monitor.WriteStatistics.StatisticType;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
@@ -36,7 +38,6 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.atomic.AtomicLong;
/**
* This writer is used to write time-value into a page. It consists of a time encoder, a value
@@ -44,15 +45,10 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class PageWriter {
- public static final AtomicLong timeRawSize = new AtomicLong();
- public static final AtomicLong timeEncodedSize = new AtomicLong();
- public static final AtomicLong valueRawSize = new AtomicLong();
- public static final AtomicLong valueEncodedSize = new AtomicLong();
- public static final AtomicLong compressedSize = new AtomicLong();
-
private static final Logger logger = LoggerFactory.getLogger(PageWriter.class);
private ICompressor compressor;
+ private String measurementId;
// time
private Encoder timeEncoder;
@@ -75,6 +71,7 @@ public class PageWriter {
this(measurementSchema.getTimeEncoder(), measurementSchema.getValueEncoder());
this.statistics = Statistics.getStatsByType(measurementSchema.getType());
this.compressor = ICompressor.getCompressor(measurementSchema.getCompressor());
+ this.measurementId = measurementSchema.getMeasurementId();
}
private PageWriter(Encoder timeEncoder, Encoder valueEncoder) {
@@ -89,8 +86,7 @@ public class PageWriter {
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
- timeRawSize.addAndGet(Long.BYTES);
- valueRawSize.addAndGet(1);
+ WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Byte.BYTES, StatisticType.rawSize);
}
/** write a time value pair into encoder */
@@ -98,8 +94,7 @@ public class PageWriter {
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
- timeRawSize.addAndGet(Long.BYTES);
- valueRawSize.addAndGet(Short.BYTES);
+ WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Short.BYTES, StatisticType.rawSize);
}
/** write a time value pair into encoder */
@@ -107,8 +102,8 @@ public class PageWriter {
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
- timeRawSize.addAndGet(Long.BYTES);
- valueRawSize.addAndGet(Integer.BYTES);
+ WriteStatistics.INSTANCE.update(
+ measurementId, Long.BYTES, Integer.BYTES, StatisticType.rawSize);
}
/** write a time value pair into encoder */
@@ -116,8 +111,7 @@ public class PageWriter {
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
- timeRawSize.addAndGet(Long.BYTES);
- valueRawSize.addAndGet(Long.BYTES);
+ WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Long.BYTES, StatisticType.rawSize);
}
/** write a time value pair into encoder */
@@ -125,8 +119,7 @@ public class PageWriter {
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
- timeRawSize.addAndGet(Long.BYTES);
- valueRawSize.addAndGet(Float.BYTES);
+ WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Float.BYTES, StatisticType.rawSize);
}
/** write a time value pair into encoder */
@@ -134,8 +127,7 @@ public class PageWriter {
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
- timeRawSize.addAndGet(Long.BYTES);
- valueRawSize.addAndGet(Double.BYTES);
+ WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, Double.BYTES, StatisticType.rawSize);
}
/** write a time value pair into encoder */
@@ -143,8 +135,8 @@ public class PageWriter {
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
- timeRawSize.addAndGet(Long.BYTES);
- valueRawSize.addAndGet(value.getLength());
+ WriteStatistics.INSTANCE.update(
+ measurementId, Long.BYTES, value.getLength(), StatisticType.rawSize);
}
/** write time series into encoder */
@@ -153,8 +145,8 @@ public class PageWriter {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
- timeRawSize.addAndGet(Long.BYTES * batchSize);
- valueRawSize.addAndGet(batchSize);
+ WriteStatistics.INSTANCE.update(
+ measurementId, (long) Long.BYTES * batchSize, batchSize, StatisticType.rawSize);
statistics.update(timestamps, values, batchSize);
}
@@ -164,8 +156,11 @@ public class PageWriter {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
- timeRawSize.addAndGet(Long.BYTES * batchSize);
- valueRawSize.addAndGet(Integer.BYTES * batchSize);
+ WriteStatistics.INSTANCE.update(
+ measurementId,
+ (long) Long.BYTES * batchSize,
+ (long) Integer.BYTES * batchSize,
+ StatisticType.rawSize);
statistics.update(timestamps, values, batchSize);
}
@@ -175,8 +170,11 @@ public class PageWriter {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
- timeRawSize.addAndGet(Long.BYTES * batchSize);
- valueRawSize.addAndGet(Long.BYTES * batchSize);
+ WriteStatistics.INSTANCE.update(
+ measurementId,
+ (long) Long.BYTES * batchSize,
+ (long) Long.BYTES * batchSize,
+ StatisticType.rawSize);
statistics.update(timestamps, values, batchSize);
}
@@ -186,8 +184,11 @@ public class PageWriter {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
- timeRawSize.addAndGet(Long.BYTES * batchSize);
- valueRawSize.addAndGet(Float.BYTES * batchSize);
+ WriteStatistics.INSTANCE.update(
+ measurementId,
+ (long) Long.BYTES * batchSize,
+ (long) Float.BYTES * batchSize,
+ StatisticType.rawSize);
statistics.update(timestamps, values, batchSize);
}
@@ -197,8 +198,11 @@ public class PageWriter {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
- timeRawSize.addAndGet(Long.BYTES * batchSize);
- valueRawSize.addAndGet(Double.BYTES * batchSize);
+ WriteStatistics.INSTANCE.update(
+ measurementId,
+ (long) Long.BYTES * batchSize,
+ (long) Double.BYTES * batchSize,
+ StatisticType.rawSize);
statistics.update(timestamps, values, batchSize);
}
@@ -207,8 +211,8 @@ public class PageWriter {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
- timeRawSize.addAndGet(Long.BYTES);
- valueRawSize.addAndGet(values[i].getLength());
+ WriteStatistics.INSTANCE.update(
+ measurementId, Long.BYTES, values[i].getLength(), StatisticType.rawSize);
}
statistics.update(timestamps, values, batchSize);
}
@@ -220,8 +224,8 @@ public class PageWriter {
}
/**
- * getUncompressedBytes return data what it has been written in form of <code>
- * size of time list, time list, value list</code>
+ * getUncompressedBytes return data what it has been written in form of <code> size of time list,
+ * time list, value list</code>
*
* @return a new readable ByteBuffer whose position is 0.
*/
@@ -232,8 +236,8 @@ public class PageWriter {
buffer.put(timeOut.getBuf(), 0, timeOut.size());
buffer.put(valueOut.getBuf(), 0, valueOut.size());
buffer.flip();
- timeEncodedSize.addAndGet(timeOut.size());
- valueEncodedSize.addAndGet(valueOut.size());
+ WriteStatistics.INSTANCE.update(
+ measurementId, timeOut.size(), valueOut.size(), StatisticType.encodedSize);
return buffer;
}
@@ -262,7 +266,8 @@ public class PageWriter {
compressor.compress(
pageData.array(), pageData.position(), uncompressedSize, compressedBytes);
}
- PageWriter.compressedSize.addAndGet(compressedSize);
+ WriteStatistics.INSTANCE.update(
+ measurementId, compressedSize, compressedSize, StatisticType.compressedSize);
// write the page header to IOWriter
int sizeWithoutStatistic = 0;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java
index 1d1cd8369f..23c1b197bc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.write.monitor.WriteStatistics;
+import org.apache.iotdb.tsfile.write.monitor.WriteStatistics.StatisticType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +34,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.atomic.AtomicLong;
/**
* This writer is used to write time into a page. It consists of a time encoder and respective
@@ -41,10 +42,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class TimePageWriter {
private static final Logger logger = LoggerFactory.getLogger(TimePageWriter.class);
- public static final AtomicLong timeRawSize = new AtomicLong();
- public static final AtomicLong timeEncodedSize = new AtomicLong();
- public static final AtomicLong timeCompressedSize = new AtomicLong();
+ private final String measurementId;
private final ICompressor compressor;
// time
@@ -57,18 +56,19 @@ public class TimePageWriter {
*/
private TimeStatistics statistics;
- public TimePageWriter(Encoder timeEncoder, ICompressor compressor) {
+ public TimePageWriter(Encoder timeEncoder, ICompressor compressor, String measurementId) {
this.timeOut = new PublicBAOS();
this.timeEncoder = timeEncoder;
this.statistics = new TimeStatistics();
this.compressor = compressor;
+ this.measurementId = measurementId;
}
/** write a time into encoder */
public void write(long time) {
timeEncoder.encode(time, timeOut);
statistics.update(time);
- timeRawSize.addAndGet(Long.BYTES);
+ WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, 0, StatisticType.rawSize);
}
/** write time series into encoder */
@@ -76,7 +76,8 @@ public class TimePageWriter {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
}
- timeRawSize.addAndGet(batchSize * Long.BYTES);
+ WriteStatistics.INSTANCE.update(
+ measurementId, (long) Long.BYTES * batchSize, 0, StatisticType.rawSize);
statistics.update(timestamps, batchSize);
}
@@ -96,7 +97,7 @@ public class TimePageWriter {
ByteBuffer buffer = ByteBuffer.allocate(timeOut.size());
buffer.put(timeOut.getBuf(), 0, timeOut.size());
buffer.flip();
- timeEncodedSize.addAndGet(timeOut.size());
+ WriteStatistics.INSTANCE.update(measurementId, timeOut.size(), 0, StatisticType.encodedSize);
return buffer;
}
@@ -125,7 +126,7 @@ public class TimePageWriter {
compressor.compress(
pageData.array(), pageData.position(), uncompressedSize, compressedBytes);
}
- timeCompressedSize.addAndGet(compressedSize);
+ WriteStatistics.INSTANCE.update(measurementId, compressedSize, 0, StatisticType.compressedSize);
// write the page header to IOWriter
int sizeWithoutStatistic = 0;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java
index aca4a450ad..b6c252d221 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.write.monitor.WriteStatistics;
+import org.apache.iotdb.tsfile.write.monitor.WriteStatistics.StatisticType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +37,6 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.atomic.AtomicLong;
/**
* This writer is used to write value into a page. It consists of a value encoder and respective
@@ -43,10 +44,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class ValuePageWriter {
- public static final AtomicLong valueRawSize = new AtomicLong();
- public static final AtomicLong valueEncodedSize = new AtomicLong();
- public static final AtomicLong valueCompressedSize = new AtomicLong();
-
private static final Logger logger = LoggerFactory.getLogger(ValuePageWriter.class);
private final ICompressor compressor;
@@ -67,9 +64,11 @@ public class ValuePageWriter {
private final PublicBAOS bitmapOut;
+ private String measurementId;
private static final int MASK = 1 << 7;
- public ValuePageWriter(Encoder valueEncoder, ICompressor compressor, TSDataType dataType) {
+ public ValuePageWriter(
+ Encoder valueEncoder, ICompressor compressor, TSDataType dataType, String measurementId) {
this.valueOut = new PublicBAOS();
this.bitmap = 0;
this.size = 0;
@@ -77,13 +76,15 @@ public class ValuePageWriter {
this.valueEncoder = valueEncoder;
this.statistics = Statistics.getStatsByType(dataType);
this.compressor = compressor;
+ this.measurementId = measurementId;
}
/** write a time value pair into encoder */
public void write(long time, boolean value, boolean isNull) {
setBit(isNull);
if (!isNull) {
- valueRawSize.addAndGet(1);
+ WriteStatistics.INSTANCE.update(measurementId, 0, 1, StatisticType.rawSize);
+ WriteStatistics.INSTANCE.update(measurementId, Long.BYTES, 0, StatisticType.encodedSize);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
}
@@ -93,7 +94,7 @@ public class ValuePageWriter {
public void write(long time, short value, boolean isNull) {
setBit(isNull);
if (!isNull) {
- valueRawSize.addAndGet(Short.BYTES);
+ WriteStatistics.INSTANCE.update(measurementId, 0, Short.BYTES, StatisticType.rawSize);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
}
@@ -103,7 +104,7 @@ public class ValuePageWriter {
public void write(long time, int value, boolean isNull) {
setBit(isNull);
if (!isNull) {
- valueRawSize.addAndGet(Integer.BYTES);
+ WriteStatistics.INSTANCE.update(measurementId, 0, Integer.BYTES, StatisticType.rawSize);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
}
@@ -113,7 +114,7 @@ public class ValuePageWriter {
public void write(long time, long value, boolean isNull) {
setBit(isNull);
if (!isNull) {
- valueRawSize.addAndGet(Long.BYTES);
+ WriteStatistics.INSTANCE.update(measurementId, 0, Long.BYTES, StatisticType.rawSize);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
}
@@ -123,7 +124,7 @@ public class ValuePageWriter {
public void write(long time, float value, boolean isNull) {
setBit(isNull);
if (!isNull) {
- valueRawSize.addAndGet(Float.BYTES);
+ WriteStatistics.INSTANCE.update(measurementId, 0, Float.BYTES, StatisticType.rawSize);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
}
@@ -133,7 +134,7 @@ public class ValuePageWriter {
public void write(long time, double value, boolean isNull) {
setBit(isNull);
if (!isNull) {
- valueRawSize.addAndGet(Double.BYTES);
+ WriteStatistics.INSTANCE.update(measurementId, 0, Double.BYTES, StatisticType.rawSize);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
}
@@ -143,7 +144,7 @@ public class ValuePageWriter {
public void write(long time, Binary value, boolean isNull) {
setBit(isNull);
if (!isNull) {
- valueRawSize.addAndGet(value.getLength());
+ WriteStatistics.INSTANCE.update(measurementId, 0, value.getLength(), StatisticType.rawSize);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
}
@@ -165,7 +166,7 @@ public class ValuePageWriter {
for (int i = 0; i < batchSize; i++) {
valueEncoder.encode(values[i], valueOut);
}
- valueRawSize.addAndGet(batchSize);
+ WriteStatistics.INSTANCE.update(measurementId, 0, batchSize, StatisticType.rawSize);
statistics.update(timestamps, values, batchSize);
}
@@ -174,7 +175,8 @@ public class ValuePageWriter {
for (int i = 0; i < batchSize; i++) {
valueEncoder.encode(values[i], valueOut);
}
- valueRawSize.addAndGet(Integer.BYTES * batchSize);
+ WriteStatistics.INSTANCE.update(
+ measurementId, 0, Integer.BYTES * batchSize, StatisticType.rawSize);
statistics.update(timestamps, values, batchSize);
}
@@ -183,7 +185,8 @@ public class ValuePageWriter {
for (int i = 0; i < batchSize; i++) {
valueEncoder.encode(values[i], valueOut);
}
- valueRawSize.addAndGet(Long.BYTES * batchSize);
+ WriteStatistics.INSTANCE.update(
+ measurementId, 0, (long) Long.BYTES * batchSize, StatisticType.rawSize);
statistics.update(timestamps, values, batchSize);
}
@@ -192,7 +195,8 @@ public class ValuePageWriter {
for (int i = 0; i < batchSize; i++) {
valueEncoder.encode(values[i], valueOut);
}
- valueRawSize.addAndGet(Float.BYTES * batchSize);
+ WriteStatistics.INSTANCE.update(
+ measurementId, 0, Float.BYTES * batchSize, StatisticType.rawSize);
statistics.update(timestamps, values, batchSize);
}
@@ -201,7 +205,8 @@ public class ValuePageWriter {
for (int i = 0; i < batchSize; i++) {
valueEncoder.encode(values[i], valueOut);
}
- valueRawSize.addAndGet(Double.BYTES * batchSize);
+ WriteStatistics.INSTANCE.update(
+ measurementId, 0, Double.BYTES * batchSize, StatisticType.rawSize);
statistics.update(timestamps, values, batchSize);
}
@@ -209,7 +214,8 @@ public class ValuePageWriter {
public void write(long[] timestamps, Binary[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
valueEncoder.encode(values[i], valueOut);
- valueRawSize.addAndGet(values[i].getLength());
+ WriteStatistics.INSTANCE.update(
+ measurementId, 0, values[i].getLength(), StatisticType.rawSize);
}
statistics.update(timestamps, values, batchSize);
}
@@ -230,7 +236,11 @@ public class ValuePageWriter {
*/
public ByteBuffer getUncompressedBytes() throws IOException {
prepareEndWriteOnePage();
- valueEncodedSize.addAndGet(Integer.BYTES + bitmapOut.size() + valueOut.size());
+ WriteStatistics.INSTANCE.update(
+ measurementId,
+ 0,
+ Integer.BYTES + bitmapOut.size() + valueOut.size(),
+ StatisticType.encodedSize);
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES + bitmapOut.size() + valueOut.size());
buffer.putInt(size);
buffer.put(bitmapOut.getBuf(), 0, bitmapOut.size());
@@ -271,7 +281,7 @@ public class ValuePageWriter {
compressor.compress(
pageData.array(), pageData.position(), uncompressedSize, compressedBytes);
}
- valueCompressedSize.addAndGet(compressedSize);
+ WriteStatistics.INSTANCE.update(measurementId, 0, compressedSize, StatisticType.compressedSize);
// write the page header to IOWriter
int sizeWithoutStatistic = 0;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
index cab975c9ab..a28f4d2619 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
@@ -45,7 +45,7 @@ public class TimePageWriterTest {
public void testWrite() {
Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
- TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+ TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor, "");
try {
pageWriter.write(1L);
assertEquals(8, pageWriter.estimateMaxMemSize());
@@ -68,7 +68,7 @@ public class TimePageWriterTest {
public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() {
Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
- TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+ TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor, "");
PublicBAOS publicBAOS = new PublicBAOS();
try {
pageWriter.write(1L);
@@ -99,7 +99,7 @@ public class TimePageWriterTest {
public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() {
Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
- TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+ TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor, "");
PublicBAOS publicBAOS = new PublicBAOS();
try {
pageWriter.write(1L);
@@ -135,7 +135,7 @@ public class TimePageWriterTest {
public void testWritePageHeaderAndDataIntoBuffWithSnappy() {
Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY);
- TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+ TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor, "");
PublicBAOS publicBAOS = new PublicBAOS();
try {
pageWriter.write(1L);
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java
index a43159fba1..fc6cc272fa 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java
@@ -46,7 +46,8 @@ public class ValuePageWriterTest {
public void testWrite1() {
Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
- ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+ ValuePageWriter pageWriter =
+ new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, "");
try {
pageWriter.write(1L, 1.0f, false);
assertEquals(9, pageWriter.estimateMaxMemSize());
@@ -69,7 +70,8 @@ public class ValuePageWriterTest {
public void testWrite2() {
Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
- ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+ ValuePageWriter pageWriter =
+ new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, "");
try {
for (int time = 1; time <= 16; time++) {
pageWriter.write(time, (float) time, time % 4 == 0);
@@ -99,7 +101,8 @@ public class ValuePageWriterTest {
public void testWrite3() {
Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
- ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+ ValuePageWriter pageWriter =
+ new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, "");
try {
for (int time = 1; time <= 20; time++) {
pageWriter.write(time, (float) time, time % 4 == 0);
@@ -130,7 +133,8 @@ public class ValuePageWriterTest {
public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() {
Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
- ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+ ValuePageWriter pageWriter =
+ new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, "");
PublicBAOS publicBAOS = new PublicBAOS();
try {
for (int time = 1; time <= 20; time++) {
@@ -178,7 +182,8 @@ public class ValuePageWriterTest {
public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() {
Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
- ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+ ValuePageWriter pageWriter =
+ new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, "");
PublicBAOS publicBAOS = new PublicBAOS();
try {
for (int time = 1; time <= 20; time++) {
@@ -237,7 +242,8 @@ public class ValuePageWriterTest {
public void testWritePageHeaderAndDataIntoBuffWithSnappy() {
Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY);
- ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+ ValuePageWriter pageWriter =
+ new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT, "");
PublicBAOS publicBAOS = new PublicBAOS();
try {
for (int time = 1; time <= 20; time++) {