You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/03 03:00:31 UTC
[iotdb] 01/01: add check in flush
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch DMB
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 17111a1397d9ad87346ccad19b95dc751f174e90
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Mar 3 10:59:25 2022 +0800
add check in flush
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 68 ++++++++++++++++++++--
.../iotdb/tsfile/file/header/PageHeader.java | 2 +-
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 48 +++++++++++++++
.../apache/iotdb/tsfile/write/page/PageWriter.java | 2 +
4 files changed, 113 insertions(+), 7 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 9397af1..829cf11 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -236,11 +236,20 @@ public class MemTableFlushTask {
Pair<TVList, MeasurementSchema> encodingMessage =
(Pair<TVList, MeasurementSchema>) task;
IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
- writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
+ try {
+ writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
+ } catch (IllegalStateException e) {
+ LOGGER.error(
+ "IllegalStateException in encoding stage, dataType is: {}",
+ encodingMessage.right.getType(),
+ e);
+ printTVList(encodingMessage.left, encodingMessage.right.getType());
+ throw e;
+ }
seriesWriter.sealCurrentPage();
seriesWriter.clearPageWriter();
try {
- ioTaskQueue.put(seriesWriter);
+ ioTaskQueue.put(new Pair<>((ChunkWriterImpl) seriesWriter, encodingMessage.left));
} catch (InterruptedException e) {
LOGGER.error("Put task into ioTaskQueue Interrupted");
Thread.currentThread().interrupt();
@@ -285,13 +294,23 @@ public class MemTableFlushTask {
this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
} else if (ioMessage instanceof TaskEnd) {
break;
- } else if (ioMessage instanceof IChunkWriter) {
- ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
- chunkWriter.writeToFileWriter(this.writer);
- } else {
+ } else if (ioMessage instanceof EndChunkGroupIoTask) {
this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
this.writer.endChunkGroup();
+ } else {
+ Pair<ChunkWriterImpl, TVList> pair = (Pair<ChunkWriterImpl, TVList>) ioMessage;
+ ChunkWriterImpl chunkWriter = pair.left;
+ try {
+ chunkWriter.writeToFileWriter(this.writer);
+ } catch (IllegalStateException e) {
+ LOGGER.error(
+ "IllegalStateException in io stage, dataType is: {}",
+ chunkWriter.getDataType(),
+ e);
+ printTVList(pair.right, chunkWriter.getDataType());
+ throw e;
+ }
}
} catch (IOException e) {
LOGGER.error(
@@ -325,4 +344,41 @@ public class MemTableFlushTask {
this.deviceId = deviceId;
}
}
+
+ private static void printTVList(TVList tvList, TSDataType dataType) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < tvList.size(); i++) {
+ long time = tvList.getTime(i);
+
+ // skip duplicated data
+ if ((i + 1 < tvList.size() && (time == tvList.getTime(i + 1)))) {
+ continue;
+ }
+
+ switch (dataType) {
+ case BOOLEAN:
+ builder.append("{").append(time).append(",").append(tvList.getBoolean(i)).append("}");
+ break;
+ case INT32:
+ builder.append("{").append(time).append(",").append(tvList.getInt(i)).append("}");
+ break;
+ case INT64:
+ builder.append("{").append(time).append(",").append(tvList.getLong(i)).append("}");
+ break;
+ case FLOAT:
+ builder.append("{").append(time).append(",").append(tvList.getFloat(i)).append("}");
+ break;
+ case DOUBLE:
+ builder.append("{").append(time).append(",").append(tvList.getDouble(i)).append("}");
+ break;
+ case TEXT:
+ builder.append("{").append(time).append(",").append(tvList.getBinary(i)).append("}");
+ break;
+ default:
+ LOGGER.error("does not support data type: {}", dataType);
+ break;
+ }
+ }
+ LOGGER.error("TVList is: {}", builder);
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
index 178a0a5..bfbbaaf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
@@ -31,7 +31,7 @@ import java.nio.ByteBuffer;
public class PageHeader {
private int uncompressedSize;
- private int compressedSize;
+ public int compressedSize;
private Statistics statistics;
private boolean modified;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index d44ee88..8f9611e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
@@ -40,6 +42,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Collections;
public class ChunkWriterImpl implements IChunkWriter {
@@ -308,7 +312,23 @@ public class ChunkWriterImpl implements IChunkWriter {
// update statistics of this chunk
numOfPages++;
+ if (this.statistics.getStartTime() == 0) {
+ logger.error(
+ "before merge statistics' startTime is 0: {}, pageWriter statistics: {}",
+ this.statistics,
+ pageWriter.getStatistics());
+ throw new IllegalStateException("before merge statistics' startTime is 0");
+ }
+
this.statistics.mergeStatistics(pageWriter.getStatistics());
+
+ if (this.statistics.getStartTime() == 0) {
+ logger.error(
+ "after merge statistics' startTime is 0: {}, pageWriter statistics: {}",
+ this.statistics,
+ pageWriter.getStatistics());
+ throw new IllegalStateException("after merge statistics' startTime is 0");
+ }
} catch (IOException e) {
logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
} finally {
@@ -449,6 +469,34 @@ public class ChunkWriterImpl implements IChunkWriter {
// write all pages of this column
writer.writeBytesToStream(pageBuffer);
+ ChunkHeader header =
+ new ChunkHeader(
+ measurementSchema.getMeasurementId(),
+ pageBuffer.size(),
+ measurementSchema.getType(),
+ compressor.getType(),
+ measurementSchema.getEncodingType(),
+ numOfPages);
+ try {
+ Chunk chunk =
+ new Chunk(
+ header,
+ ByteBuffer.wrap(pageBuffer.toByteArray()),
+ Collections.emptyList(),
+ statistics);
+ ChunkReader chunkReader = new ChunkReader(chunk, null);
+ while (chunkReader.hasNextSatisfiedPage()) {
+ chunkReader.nextPageData();
+ }
+ } catch (Throwable e) {
+ logger.error(
+ "Verify chunk failed, chunk statistics: {}, chunk header: {}, chunk data: {}",
+ this.statistics,
+ header,
+ Arrays.toString(pageBuffer.toByteArray()));
+ throw new IllegalStateException("Verify chunk failed");
+ }
+
int dataSize = (int) (writer.getPos() - dataOffset);
if (dataSize != pageBuffer.size()) {
throw new IOException(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
index afa41c5..696d799 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
@@ -247,7 +247,9 @@ public class PageWriter {
channel.write(pageData);
}
} else {
+ int previousPosition = pageBuffer.size();
pageBuffer.write(compressedBytes, 0, compressedSize);
+ if (pageBuffer.size() - previousPosition == compressedSize) {}
}
logger.trace("start to flush a page data into buffer, buffer position {} ", pageBuffer.size());
return sizeWithoutStatistic;