You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/01 01:06:48 UTC
[iotdb] branch master updated: [IOTDB-4738]TsFile damaged after writing empty value pages (#7827)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 521f9b5f8e [IOTDB-4738]TsFile damaged after writing empty value pages (#7827)
521f9b5f8e is described below
commit 521f9b5f8e770031eb364dd10b9485099dbe6b7c
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Tue Nov 1 09:06:41 2022 +0800
[IOTDB-4738]TsFile damaged after writing empty value pages (#7827)
---
.../apache/iotdb/tsfile/write/TsFileWriter.java | 4 +-
.../write/chunk/AlignedChunkGroupWriterImpl.java | 6 +-
.../tsfile/write/chunk/AlignedChunkWriterImpl.java | 4 +
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 1 +
.../tsfile/write/chunk/IChunkGroupWriter.java | 4 +-
.../iotdb/tsfile/write/chunk/TimeChunkWriter.java | 1 +
.../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 12 +-
.../iotdb/tsfile/write/TsFileWriteApiTest.java | 264 +++++++++++++++++++++
8 files changed, 288 insertions(+), 8 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index ec2ef2baf5..2ab6780f18 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -308,7 +308,7 @@ public class TsFileWriter implements AutoCloseable {
}
private boolean checkIsTimeseriesExist(TSRecord record, boolean isAligned)
- throws WriteProcessException {
+ throws WriteProcessException, IOException {
// initial ChunkGroupWriter of this device in the TSRecord
IChunkGroupWriter groupWriter = tryToInitialGroupWriter(record.deviceId, isAligned);
@@ -348,7 +348,7 @@ public class TsFileWriter implements AutoCloseable {
}
private void checkIsTimeseriesExist(Tablet tablet, boolean isAligned)
- throws WriteProcessException {
+ throws WriteProcessException, IOException {
IChunkGroupWriter groupWriter = tryToInitialGroupWriter(tablet.deviceId, isAligned);
Path devicePath = new Path(tablet.deviceId);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index 9a43b55b7b..9b6135349c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -68,7 +68,7 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
}
@Override
- public void tryToAddSeriesWriter(MeasurementSchema measurementSchema) {
+ public void tryToAddSeriesWriter(MeasurementSchema measurementSchema) throws IOException {
if (!valueChunkWriterMap.containsKey(measurementSchema.getMeasurementId())) {
ValueChunkWriter valueChunkWriter =
new ValueChunkWriter(
@@ -83,7 +83,7 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
}
@Override
- public void tryToAddSeriesWriter(List<MeasurementSchema> measurementSchemas) {
+ public void tryToAddSeriesWriter(List<MeasurementSchema> measurementSchemas) throws IOException {
for (MeasurementSchema schema : measurementSchemas) {
if (!valueChunkWriterMap.containsKey(schema.getMeasurementId())) {
ValueChunkWriter valueChunkWriter =
@@ -244,7 +244,7 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
return size;
}
- public void tryToAddEmptyPageAndData(ValueChunkWriter valueChunkWriter) {
+ public void tryToAddEmptyPageAndData(ValueChunkWriter valueChunkWriter) throws IOException {
// add empty page
for (int i = 0; i < timeChunkWriter.getNumOfPages(); i++) {
valueChunkWriter.writeEmptyPageToPageBuffer();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
index f22909f268..29017293d5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -376,4 +376,8 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
public TSDataType getCurrentValueChunkType() {
return valueChunkWriterList.get(valueIndex).getDataType();
}
+
+ public ValueChunkWriter getValueChunkWriterByIndex(int valueIndex) {
+ return valueChunkWriterList.get(valueIndex);
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index d603c58efb..6ad3823518 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -314,6 +314,7 @@ public class ChunkWriterImpl implements IChunkWriter {
// reinit this chunk writer
pageBuffer.reset();
numOfPages = 0;
+ sizeWithoutStatistic = 0;
firstPageStatistics = null;
this.statistics = Statistics.getStatsByType(measurementSchema.getType());
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkGroupWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkGroupWriter.java
index 6f8cd1f687..94bee180b0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkGroupWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkGroupWriter.java
@@ -76,7 +76,7 @@ public interface IChunkGroupWriter {
*
* @param measurementSchema a measurement descriptor containing the message of the series
*/
- void tryToAddSeriesWriter(MeasurementSchema measurementSchema);
+ void tryToAddSeriesWriter(MeasurementSchema measurementSchema) throws IOException;
/**
* given a measurement descriptor list, create corresponding writers and put into this
@@ -84,7 +84,7 @@ public interface IChunkGroupWriter {
*
* @param measurementSchemas
*/
- void tryToAddSeriesWriter(List<MeasurementSchema> measurementSchemas);
+ void tryToAddSeriesWriter(List<MeasurementSchema> measurementSchemas) throws IOException;
/**
* get the serialized size of current chunkGroup header + all chunks. Notice, the value does not
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
index dc2f2d15f8..9e266e2d7b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@ -227,6 +227,7 @@ public class TimeChunkWriter {
// reinit this chunk writer
pageBuffer.reset();
numOfPages = 0;
+ sizeWithoutStatistic = 0;
firstPageStatistics = null;
this.statistics = new TimeStatistics();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
index 98d67a6954..a9266e1085 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
@@ -154,7 +154,16 @@ public class ValueChunkWriter {
pageWriter.write(timestamps, values, isNull, batchSize, pos);
}
- public void writeEmptyPageToPageBuffer() {
+ public void writeEmptyPageToPageBuffer() throws IOException {
+ if (numOfPages == 1 && firstPageStatistics != null) {
+ // if the first page is not an empty page
+ byte[] b = pageBuffer.toByteArray();
+ pageBuffer.reset();
+ pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+ firstPageStatistics.serialize(pageBuffer);
+ pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+ firstPageStatistics = null;
+ }
pageWriter.writeEmptyPageIntoBuff(pageBuffer);
numOfPages++;
}
@@ -250,6 +259,7 @@ public class ValueChunkWriter {
// reinit this chunk writer
pageBuffer.reset();
numOfPages = 0;
+ sizeWithoutStatistic = 0;
firstPageStatistics = null;
this.statistics = Statistics.getStatsByType(dataType);
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriteApiTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriteApiTest.java
index cafa4a0f63..7eb9fae547 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriteApiTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriteApiTest.java
@@ -20,14 +20,27 @@ package org.apache.iotdb.tsfile.write;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileReader;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
+import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.junit.After;
import org.junit.Assert;
@@ -36,7 +49,9 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
public class TsFileWriteApiTest {
@@ -423,4 +438,253 @@ public class TsFileWriteApiTest {
Assert.fail("Meet errors in test: " + e.getMessage());
}
}
+
+ /** Write an empty page and then write a nonEmpty page. */
+ @Test
+ public void writeAlignedTimeseriesWithEmptyPage() throws IOException, WriteProcessException {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+ registerAlignedTimeseries(tsFileWriter);
+
+ List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
+ // example1
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
+ TsFileGeneratorUtils.writeWithTsRecord(
+ tsFileWriter, deviceId, writeMeasurementScheams, 30, 0, 0, true);
+
+ // example2
+ writeMeasurementScheams.clear();
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(2));
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
+ TsFileGeneratorUtils.writeWithTsRecord(
+ tsFileWriter, deviceId, writeMeasurementScheams, 30, 1000, 500, true);
+
+ // example3 : late data
+ writeMeasurementScheams.clear();
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(2));
+ TsFileGeneratorUtils.writeWithTsRecord(
+ tsFileWriter, deviceId, writeMeasurementScheams, 60, 300000, 50, true);
+ }
+
+ TsFileReader tsFileReader = new TsFileReader(new TsFileSequenceReader(f.getAbsolutePath()));
+ for (int i = 0; i < 3; i++) {
+ QueryExpression queryExpression =
+ QueryExpression.create(
+ Collections.singletonList(
+ new Path(deviceId, alignedMeasurementSchemas.get(i).getMeasurementId())),
+ null);
+ QueryDataSet queryDataSet = tsFileReader.query(queryExpression);
+
+ int cnt = 0;
+ while (queryDataSet.hasNext()) {
+ cnt++;
+ queryDataSet.next();
+ }
+ if (i < 2) {
+ Assert.assertEquals(60, cnt);
+ } else {
+ Assert.assertEquals(90, cnt);
+ }
+ }
+ }
+
+ /** Write a nonEmpty page and then write an empty page. */
+ @Test
+ public void writeAlignedTimeseriesWithEmptyPage2() throws IOException, WriteProcessException {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+ registerAlignedTimeseries(tsFileWriter);
+
+ List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
+ // example1
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(3));
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(2));
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
+ TsFileGeneratorUtils.writeWithTsRecord(
+ tsFileWriter, deviceId, writeMeasurementScheams, 30, 0, 0, true);
+
+ // example2
+ writeMeasurementScheams.clear();
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
+ TsFileGeneratorUtils.writeWithTsRecord(
+ tsFileWriter, deviceId, writeMeasurementScheams, 30, 1000, 500, true);
+ }
+
+ TsFileReader tsFileReader = new TsFileReader(new TsFileSequenceReader(f.getAbsolutePath()));
+ for (int i = 0; i < 3; i++) {
+ QueryExpression queryExpression =
+ QueryExpression.create(
+ Collections.singletonList(
+ new Path(deviceId, alignedMeasurementSchemas.get(i).getMeasurementId())),
+ null);
+ QueryDataSet queryDataSet = tsFileReader.query(queryExpression);
+ int cnt = 0;
+ while (queryDataSet.hasNext()) {
+ cnt++;
+ queryDataSet.next();
+ }
+ if (i < 2) {
+ Assert.assertEquals(60, cnt);
+ } else {
+ Assert.assertEquals(30, cnt);
+ }
+ }
+ }
+
+ /** Write a nonEmpty page and then write an empty page. */
+ @Test
+ public void writeAlignedTimeseriesWithEmptyPage3() throws IOException, WriteProcessException {
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+ registerAlignedTimeseries(tsFileWriter);
+
+ List<IMeasurementSchema> writeMeasurementScheams = new ArrayList<>();
+ // example1
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(2));
+ writeMeasurementScheams.add(alignedMeasurementSchemas.get(3));
+
+ TsFileIOWriter tsFileIOWriter = tsFileWriter.getIOWriter();
+ tsFileIOWriter.startChunkGroup(deviceId);
+
+ AlignedChunkWriterImpl alignedChunkWriter =
+ new AlignedChunkWriterImpl(writeMeasurementScheams);
+
+ // write one nonEmpty page
+ for (long time = 0; time < 30; time++) {
+ for (int i = 0; i < 4; i++) {
+ alignedChunkWriter.getValueChunkWriterByIndex(i).write(time, time, false);
+ }
+ alignedChunkWriter.write(time);
+ }
+ alignedChunkWriter.sealCurrentPage();
+
+ // write a nonEmpty page of s0 and s1, an empty page of s2 and s3
+ for (long time = 30; time < 60; time++) {
+ for (int i = 0; i < 2; i++) {
+ alignedChunkWriter.getValueChunkWriterByIndex(i).write(time, time, false);
+ }
+ }
+ for (int i = 2; i < 4; i++) {
+ alignedChunkWriter.getValueChunkWriterByIndex(i).writeEmptyPageToPageBuffer();
+ }
+ for (long time = 30; time < 60; time++) {
+ alignedChunkWriter.write(time);
+ }
+ alignedChunkWriter.writeToFileWriter(tsFileIOWriter);
+ tsFileIOWriter.endChunkGroup();
+ }
+
+ // read file
+ TsFileReader tsFileReader = new TsFileReader(new TsFileSequenceReader(f.getAbsolutePath()));
+ for (int i = 0; i < 3; i++) {
+ QueryExpression queryExpression =
+ QueryExpression.create(
+ Collections.singletonList(
+ new Path(deviceId, alignedMeasurementSchemas.get(i).getMeasurementId())),
+ null);
+ QueryDataSet queryDataSet = tsFileReader.query(queryExpression);
+ int cnt = 0;
+ while (queryDataSet.hasNext()) {
+ cnt++;
+ queryDataSet.next();
+ }
+ if (i < 2) {
+ Assert.assertEquals(60, cnt);
+ } else {
+ Assert.assertEquals(30, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void writeTsFileByFlushingPageDirectly() throws IOException, WriteProcessException {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+
+ // create a tsfile with four pages in one timeseries
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+ registerTimeseries(tsFileWriter);
+
+ List<MeasurementSchema> writeMeasurementSchemas = new ArrayList<>();
+ writeMeasurementSchemas.add(measurementSchemas.get(0));
+
+ TsFileGeneratorUtils.writeWithTsRecord(
+ tsFileWriter, deviceId, writeMeasurementSchemas, 30, 0, 0, false);
+ TsFileGeneratorUtils.writeWithTsRecord(
+ tsFileWriter, deviceId, writeMeasurementSchemas, 30, 30, 30, false);
+ TsFileGeneratorUtils.writeWithTsRecord(
+ tsFileWriter, deviceId, writeMeasurementSchemas, 30, 60, 60, false);
+ TsFileGeneratorUtils.writeWithTsRecord(
+ tsFileWriter, deviceId, writeMeasurementSchemas, 30, 90, 90, false);
+ }
+
+ ChunkWriterImpl chunkWriter = new ChunkWriterImpl(measurementSchemas.get(0));
+
+ // rewrite a new tsfile by flushing page directly
+ File file = FSFactoryProducer.getFSFactory().getFile("test.tsfile");
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(f.getAbsolutePath());
+ TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(file)) {
+ tsFileIOWriter.startChunkGroup(deviceId);
+ for (List<ChunkMetadata> chunkMetadatas :
+ reader.readChunkMetadataInDevice(deviceId).values()) {
+ for (ChunkMetadata chunkMetadata : chunkMetadatas) {
+ Chunk chunk = reader.readMemChunk(chunkMetadata);
+ ByteBuffer chunkDataBuffer = chunk.getData();
+ ChunkHeader chunkHeader = chunk.getHeader();
+ int pageNum = 0;
+ while (chunkDataBuffer.remaining() > 0) {
+ // deserialize a PageHeader from chunkDataBuffer
+ PageHeader pageHeader;
+ if (((byte) (chunkHeader.getChunkType() & 0x3F))
+ == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
+ pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunk.getChunkStatistic());
+ } else {
+ pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
+ }
+
+ // read compressed page data
+ int compressedPageBodyLength = pageHeader.getCompressedSize();
+ byte[] compressedPageBody = new byte[compressedPageBodyLength];
+ chunkDataBuffer.get(compressedPageBody);
+ chunkWriter.writePageHeaderAndDataIntoBuff(
+ ByteBuffer.wrap(compressedPageBody), pageHeader);
+ if (++pageNum % 2 == 0) {
+ chunkWriter.writeToFileWriter(tsFileIOWriter);
+ }
+ }
+ }
+ }
+ tsFileIOWriter.endChunkGroup();
+ tsFileIOWriter.endFile();
+
+ // read file
+ TsFileReader tsFileReader =
+ new TsFileReader(new TsFileSequenceReader(file.getAbsolutePath()));
+
+ QueryExpression queryExpression =
+ QueryExpression.create(
+ Collections.singletonList(
+ new Path(deviceId, measurementSchemas.get(0).getMeasurementId())),
+ null);
+ QueryDataSet queryDataSet = tsFileReader.query(queryExpression);
+ int cnt = 0;
+ while (queryDataSet.hasNext()) {
+ cnt++;
+ // Assert.assertEquals(queryDataSet);
+ queryDataSet.next();
+ }
+
+ Assert.assertEquals(120, cnt);
+
+ } catch (Throwable throwable) {
+ if (file.exists()) {
+ file.delete();
+ }
+ throw throwable;
+ }
+ }
}