You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/11/30 15:01:12 UTC
[incubator-iotdb] 01/01: fix page writer in statistics
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_statistics
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit adbd37eae3fdf64da42be56956a165d36c3396c1
Author: qiaojialin <64...@qq.com>
AuthorDate: Sat Nov 30 23:00:00 2019 +0800
fix page writer in statistics
---
.../iotdb/tsfile/file/header/ChunkHeader.java | 5 --
.../iotdb/tsfile/file/header/PageHeader.java | 41 +++--------
.../iotdb/tsfile/file/metadata/ChunkMetaData.java | 3 +-
.../file/metadata/statistics/BinaryStatistics.java | 10 +--
.../file/metadata/statistics/Statistics.java | 6 +-
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 34 ++++-----
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 12 +--
.../apache/iotdb/tsfile/write/page/PageWriter.java | 85 +---------------------
.../tsfile/file/metadata/utils/TestHelper.java | 3 +-
9 files changed, 43 insertions(+), 156 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index e7cd634..09318aa 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -78,11 +78,6 @@ public class ChunkHeader {
+ Integer.BYTES; // numOfPages
}
- private static int getSerializedSize(int measurementIdLength) {
- return measurementIdLength + Integer.BYTES + TSDataType.getSerializedSize() + Integer.BYTES
- + CompressionType.getSerializedSize() + TSEncoding.getSerializedSize();
- }
-
/**
* deserialize from inputStream.
*
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
index 08e6076..bc387b3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
@@ -31,48 +31,31 @@ public class PageHeader {
private int uncompressedSize;
private int compressedSize;
- private int numOfValues;
- private long endTime;
- private long startTime;
private Statistics statistics;
- public PageHeader(int uncompressedSize, int compressedSize, int numOfValues,
- Statistics statistics, long endTime, long startTime) {
+ public PageHeader(int uncompressedSize, int compressedSize, Statistics statistics) {
this.uncompressedSize = uncompressedSize;
this.compressedSize = compressedSize;
- this.numOfValues = numOfValues;
this.statistics = statistics;
- this.endTime = endTime;
- this.startTime = startTime;
}
-
public static int calculatePageHeaderSizeWithoutStatistics() {
- return 3 * Integer.BYTES // uncompressedSize, compressedSize, numOfValues
- + 2 * Long.BYTES; // maxTimestamp, minTimestamp
+ return 2 * Integer.BYTES; // uncompressedSize, compressedSize
}
public static PageHeader deserializeFrom(InputStream inputStream, TSDataType dataType)
throws IOException {
int uncompressedSize = ReadWriteIOUtils.readInt(inputStream);
int compressedSize = ReadWriteIOUtils.readInt(inputStream);
- int numOfValues = ReadWriteIOUtils.readInt(inputStream);
- long maxTimestamp = ReadWriteIOUtils.readLong(inputStream);
- long minTimestamp = ReadWriteIOUtils.readLong(inputStream);
Statistics statistics = Statistics.deserialize(inputStream, dataType);
- return new PageHeader(uncompressedSize, compressedSize, numOfValues, statistics, maxTimestamp,
- minTimestamp);
+ return new PageHeader(uncompressedSize, compressedSize, statistics);
}
public static PageHeader deserializeFrom(ByteBuffer buffer, TSDataType dataType) {
int uncompressedSize = ReadWriteIOUtils.readInt(buffer);
int compressedSize = ReadWriteIOUtils.readInt(buffer);
- int numOfValues = ReadWriteIOUtils.readInt(buffer);
- long maxTimestamp = ReadWriteIOUtils.readLong(buffer);
- long minTimestamp = ReadWriteIOUtils.readLong(buffer);
Statistics statistics = Statistics.deserialize(buffer, dataType);
- return new PageHeader(uncompressedSize, compressedSize, numOfValues, statistics, maxTimestamp,
- minTimestamp);
+ return new PageHeader(uncompressedSize, compressedSize, statistics);
}
public int getUncompressedSize() {
@@ -91,8 +74,8 @@ public class PageHeader {
this.compressedSize = compressedSize;
}
- public int getNumOfValues() {
- return numOfValues;
+ public long getNumOfValues() {
+ return statistics.getCount();
}
public Statistics getStatistics() {
@@ -100,28 +83,22 @@ public class PageHeader {
}
public long getEndTime() {
- return endTime;
+ return statistics.getEndTime();
}
public long getStartTime() {
- return startTime;
+ return statistics.getStartTime();
}
public void serializeTo(OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(uncompressedSize, outputStream);
ReadWriteIOUtils.write(compressedSize, outputStream);
- ReadWriteIOUtils.write(numOfValues, outputStream);
- ReadWriteIOUtils.write(endTime, outputStream);
- ReadWriteIOUtils.write(startTime, outputStream);
statistics.serialize(outputStream);
}
@Override
public String toString() {
return "PageHeader{" + "uncompressedSize=" + uncompressedSize + ", compressedSize="
- + compressedSize
- + ", numOfValues=" + numOfValues + ", statistics=" + statistics + ", endTime="
- + endTime
- + ", startTime=" + startTime + '}';
+ + compressedSize + ", statistics=" + statistics + "}";
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
index 4999f23..0bdc849 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
@@ -80,7 +80,8 @@ public class ChunkMetaData {
@Override
public String toString() {
- return String.format("numPoints %d", statistics.getCount());
+ return String.format("measurementId: %s, datatype: %s, version: %d, deletedAt: %d, "
+ + "Statistics: %s", measurementUid, tsDataType, version, deletedAt, statistics);
}
public long getNumOfPoints() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
index b54e7f1..45c5ef9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
@@ -177,16 +177,14 @@ public class BinaryStatistics extends Statistics<Binary> {
@Override
void deserialize(InputStream inputStream) throws IOException {
- this.firstValue = new Binary(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream));
- this.lastValue = new Binary(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream));
+ this.firstValue = ReadWriteIOUtils.readBinary(inputStream);
+ this.lastValue = ReadWriteIOUtils.readBinary(inputStream);
}
@Override
void deserialize(ByteBuffer byteBuffer) {
- this.firstValue = new Binary(
- ReadWriteIOUtils.readByteBufferWithSelfDescriptionLength(byteBuffer).array());
- this.lastValue = new Binary(
- ReadWriteIOUtils.readByteBufferWithSelfDescriptionLength(byteBuffer).array());
+ this.firstValue = ReadWriteIOUtils.readBinary(byteBuffer);
+ this.lastValue = ReadWriteIOUtils.readBinary(byteBuffer);
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 5ece330..a0fd5de 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -145,11 +145,7 @@ public abstract class Statistics<T> {
* @throws StatisticsClassException cannot merge statistics
*/
public void mergeStatistics(Statistics<?> stats) {
- if (stats == null) {
- LOG.warn("tsfile-file parameter stats is null");
- return;
- }
- if (this.getClass() == stats.getClass() && !stats.isEmpty) {
+ if (this.getClass() == stats.getClass()) {
if (stats.startTime < this.startTime) {
this.startTime = stats.startTime;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 0a67b17..1d7bb5d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -192,16 +192,15 @@ public class ReadWriteIOUtils {
return bytes.length;
}
- public static int write(byte[] bytes, OutputStream outputStream) throws IOException {
- outputStream.write(bytes);
- return bytes.length;
- }
+ /**
+ * write the size (int) of the binary and then the bytes in binary
+ */
public static int write(Binary binary, OutputStream outputStream) throws IOException {
- byte[] bytes = BytesUtils.intToBytes(binary.getValues().length);
- outputStream.write(bytes);
+ byte[] size = BytesUtils.intToBytes(binary.getValues().length);
+ outputStream.write(size);
outputStream.write(binary.getValues());
- return bytes.length + binary.getValues().length;
+ return size.length + binary.getValues().length;
}
/**
@@ -567,17 +566,16 @@ public class ReadWriteIOUtils {
return readBytes(inputStream, length);
}
- /**
- * read bytes from inputStream, this method makes sure that you can read length bytes or reach to
- * the end of the stream.
- */
- public static ByteBuffer readByteBufferWithSelfDescriptionLength(InputStream inputStream)
- throws IOException {
- byte[] bytes = readBytesWithSelfDescriptionLength(inputStream);
- ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
- byteBuffer.put(bytes);
- byteBuffer.flip();
- return byteBuffer;
+ public static Binary readBinary(ByteBuffer buffer) {
+ int length = readInt(buffer);
+ byte[] bytes = readBytes(buffer, length);
+ return new Binary(bytes);
+ }
+
+ public static Binary readBinary(InputStream inputStream) throws IOException {
+ int length = readInt(inputStream);
+ byte[] bytes = readBytes(inputStream, length);
+ return new Binary(bytes);
}
/**
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index e6b3298..486cf41 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -75,7 +75,7 @@ public class ChunkWriterImpl implements IChunkWriter {
/**
* statistic of this chunk.
*/
- private Statistics<?> chunkStatistics;
+ private Statistics<?> statistics;
/**
* @param schema schema of this measurement
@@ -92,7 +92,7 @@ public class ChunkWriterImpl implements IChunkWriter {
this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
// init statistics for this chunk and page
- this.chunkStatistics = Statistics.getStatsByType(measurementSchema.getType());
+ this.statistics = Statistics.getStatsByType(measurementSchema.getType());
this.pageWriter = new PageWriter(measurementSchema);
this.pageWriter.setTimeEncoder(measurementSchema.getTimeEncoder());
@@ -205,7 +205,7 @@ public class ChunkWriterImpl implements IChunkWriter {
// update statistics of this chunk
numOfPages++;
- this.chunkStatistics.mergeStatistics(pageWriter.getStatistics());
+ this.statistics.mergeStatistics(pageWriter.getStatistics());
} catch (IOException e) {
logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
} finally {
@@ -217,11 +217,11 @@ public class ChunkWriterImpl implements IChunkWriter {
@Override
public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
sealCurrentPage();
- writeAllPagesOfChunkToTsFile(tsfileWriter, chunkStatistics);
+ writeAllPagesOfChunkToTsFile(tsfileWriter, statistics);
// reinit this chunk writer
pageBuffer.reset();
- this.chunkStatistics = Statistics.getStatsByType(measurementSchema.getType());
+ this.statistics = Statistics.getStatsByType(measurementSchema.getType());
}
@Override
@@ -269,7 +269,7 @@ public class ChunkWriterImpl implements IChunkWriter {
logger.debug("finish to flush a page header {} of {} into buffer, buffer position {} ", header,
measurementSchema.getMeasurementId(), pageBuffer.size());
- chunkStatistics.mergeStatistics(header.getStatistics());
+ statistics.mergeStatistics(header.getStatistics());
} catch (IOException e) {
throw new PageException(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
index 1203280..b7f1905 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
@@ -44,10 +44,6 @@ public class PageWriter {
private static final Logger logger = LoggerFactory.getLogger(PageWriter.class);
- // time of the latest written time value pair, we assume data is written in time order
- private long pageMaxTime;
- private long pageMinTime = Long.MIN_VALUE;
-
private ICompressor compressor;
// time
@@ -61,7 +57,6 @@ public class PageWriter {
* statistic of current page. It will be reset after calling {@code writePageHeaderAndDataIntoBuff()}
*/
private Statistics<?> statistics;
- private int pointNumber;
public PageWriter() {
this(null, null);
@@ -84,11 +79,6 @@ public class PageWriter {
* write a time value pair into encoder
*/
public void write(long time, boolean value) {
- ++pointNumber;
- this.pageMaxTime = time;
- if (pageMinTime == Long.MIN_VALUE) {
- pageMinTime = time;
- }
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
@@ -98,7 +88,6 @@ public class PageWriter {
* write a time value pair into encoder
*/
public void write(long time, short value) {
- ++pointNumber;
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
@@ -108,11 +97,6 @@ public class PageWriter {
* write a time value pair into encoder
*/
public void write(long time, int value) {
- ++pointNumber;
- this.pageMaxTime = time;
- if (pageMinTime == Long.MIN_VALUE) {
- pageMinTime = time;
- }
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
@@ -122,11 +106,6 @@ public class PageWriter {
* write a time value pair into encoder
*/
public void write(long time, long value) {
- ++pointNumber;
- this.pageMaxTime = time;
- if (pageMinTime == Long.MIN_VALUE) {
- pageMinTime = time;
- }
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
@@ -136,11 +115,6 @@ public class PageWriter {
* write a time value pair into encoder
*/
public void write(long time, float value) {
- ++pointNumber;
- this.pageMaxTime = time;
- if (pageMinTime == Long.MIN_VALUE) {
- pageMinTime = time;
- }
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
@@ -150,11 +124,6 @@ public class PageWriter {
* write a time value pair into encoder
*/
public void write(long time, double value) {
- ++pointNumber;
- this.pageMaxTime = time;
- if (pageMinTime == Long.MIN_VALUE) {
- pageMinTime = time;
- }
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
@@ -164,11 +133,6 @@ public class PageWriter {
* write a time value pair into encoder
*/
public void write(long time, Binary value) {
- ++pointNumber;
- this.pageMaxTime = time;
- if (pageMinTime == Long.MIN_VALUE) {
- pageMinTime = time;
- }
timeEncoder.encode(time, timeOut);
valueEncoder.encode(value, valueOut);
statistics.update(time, value);
@@ -178,11 +142,6 @@ public class PageWriter {
* write time series into encoder
*/
public void write(long[] timestamps, boolean[] values, int batchSize) {
- pointNumber += batchSize;
- this.pageMaxTime = timestamps[batchSize - 1];
- if (pageMinTime == Long.MIN_VALUE) {
- pageMinTime = timestamps[0];
- }
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
@@ -194,11 +153,6 @@ public class PageWriter {
* write time series into encoder
*/
public void write(long[] timestamps, int[] values, int batchSize) {
- pointNumber += batchSize;
- this.pageMaxTime = timestamps[batchSize - 1];
- if (pageMinTime == Long.MIN_VALUE) {
- pageMinTime = timestamps[0];
- }
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
@@ -210,11 +164,6 @@ public class PageWriter {
* write time series into encoder
*/
public void write(long[] timestamps, long[] values, int batchSize) {
- pointNumber += batchSize;
- this.pageMaxTime = timestamps[batchSize - 1];
- if (pageMinTime == Long.MIN_VALUE) {
- pageMinTime = timestamps[0];
- }
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
@@ -226,11 +175,6 @@ public class PageWriter {
* write time series into encoder
*/
public void write(long[] timestamps, float[] values, int batchSize) {
- pointNumber += batchSize;
- this.pageMaxTime = timestamps[batchSize - 1];
- if (pageMinTime == Long.MIN_VALUE) {
- pageMinTime = timestamps[0];
- }
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
@@ -242,11 +186,6 @@ public class PageWriter {
* write time series into encoder
*/
public void write(long[] timestamps, double[] values, int batchSize) {
- pointNumber += batchSize;
- this.pageMaxTime = timestamps[batchSize - 1];
- if (pageMinTime == Long.MIN_VALUE) {
- pageMinTime = timestamps[0];
- }
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
@@ -258,11 +197,6 @@ public class PageWriter {
* write time series into encoder
*/
public void write(long[] timestamps, Binary[] values, int batchSize) {
- pointNumber += batchSize;
- this.pageMaxTime = timestamps[batchSize - 1];
- if (pageMinTime == Long.MIN_VALUE) {
- pageMinTime = timestamps[0];
- }
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
@@ -294,19 +228,12 @@ public class PageWriter {
return buffer;
}
- public long getPageMaxTime() {
- return pageMaxTime;
- }
-
- public long getPageMinTime() {
- return pageMinTime;
- }
/**
* write the page header and data into the PageWriter's output stream.
*/
public void writePageHeaderAndDataIntoBuff(PublicBAOS pageBuffer) throws IOException {
- if (pointNumber == 0) {
+ if (statistics.getCount() == 0) {
return;
}
@@ -327,8 +254,7 @@ public class PageWriter {
}
// write the page header to IOWriter
- PageHeader header = new PageHeader(uncompressedSize, compressedSize, pointNumber, statistics,
- pageMaxTime, pageMinTime);
+ PageHeader header = new PageHeader(uncompressedSize, compressedSize, statistics);
header.serializeTo(pageBuffer);
// write page content to temp PBAOS
@@ -360,9 +286,6 @@ public class PageWriter {
public void reset(MeasurementSchema measurementSchema) {
timeOut.reset();
valueOut.reset();
- pointNumber =0;
- pageMinTime = Long.MIN_VALUE;
- pageMaxTime = Long.MIN_VALUE;
statistics = Statistics.getStatsByType(measurementSchema.getType());
}
@@ -378,8 +301,8 @@ public class PageWriter {
statistics = Statistics.getStatsByType(dataType);
}
- public int getPointNumber(){
- return pointNumber;
+ public long getPointNumber(){
+ return statistics.getCount();
}
public Statistics<?> getStatistics(){
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java
index a81aa30..feab49a 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java
@@ -69,7 +69,6 @@ public class TestHelper {
Statistics<?> statistics = Statistics.getStatsByType(PageHeaderTest.DATA_TYPE);
statistics.setEmpty(false);
return new PageHeader(PageHeaderTest.UNCOMPRESSED_SIZE,
- PageHeaderTest.COMPRESSED_SIZE, PageHeaderTest.NUM_OF_VALUES,
- statistics, PageHeaderTest.MAX_TIMESTAMO, PageHeaderTest.MIN_TIMESTAMO);
+ PageHeaderTest.COMPRESSED_SIZE, statistics);
}
}