You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/03 03:00:31 UTC

[iotdb] 01/01: add check in flush

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DMB
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 17111a1397d9ad87346ccad19b95dc751f174e90
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Mar 3 10:59:25 2022 +0800

    add check in flush
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 68 ++++++++++++++++++++--
 .../iotdb/tsfile/file/header/PageHeader.java       |  2 +-
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  | 48 +++++++++++++++
 .../apache/iotdb/tsfile/write/page/PageWriter.java |  2 +
 4 files changed, 113 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 9397af1..829cf11 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -236,11 +236,20 @@ public class MemTableFlushTask {
               Pair<TVList, MeasurementSchema> encodingMessage =
                   (Pair<TVList, MeasurementSchema>) task;
               IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
-              writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
+              try {
+                writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
+              } catch (IllegalStateException e) {
+                LOGGER.error(
+                    "IllegalStateException in encoding stage, dataType is: {}",
+                    encodingMessage.right.getType(),
+                    e);
+                printTVList(encodingMessage.left, encodingMessage.right.getType());
+                throw e;
+              }
               seriesWriter.sealCurrentPage();
               seriesWriter.clearPageWriter();
               try {
-                ioTaskQueue.put(seriesWriter);
+                ioTaskQueue.put(new Pair<>((ChunkWriterImpl) seriesWriter, encodingMessage.left));
               } catch (InterruptedException e) {
                 LOGGER.error("Put task into ioTaskQueue Interrupted");
                 Thread.currentThread().interrupt();
@@ -285,13 +294,23 @@ public class MemTableFlushTask {
               this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
             } else if (ioMessage instanceof TaskEnd) {
               break;
-            } else if (ioMessage instanceof IChunkWriter) {
-              ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
-              chunkWriter.writeToFileWriter(this.writer);
-            } else {
+            } else if (ioMessage instanceof EndChunkGroupIoTask) {
               this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
               this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
               this.writer.endChunkGroup();
+            } else {
+              Pair<ChunkWriterImpl, TVList> pair = (Pair<ChunkWriterImpl, TVList>) ioMessage;
+              ChunkWriterImpl chunkWriter = pair.left;
+              try {
+                chunkWriter.writeToFileWriter(this.writer);
+              } catch (IllegalStateException e) {
+                LOGGER.error(
+                    "IllegalStateException in io stage, dataType is: {}",
+                    chunkWriter.getDataType(),
+                    e);
+                printTVList(pair.right, chunkWriter.getDataType());
+                throw e;
+              }
             }
           } catch (IOException e) {
             LOGGER.error(
@@ -325,4 +344,41 @@ public class MemTableFlushTask {
       this.deviceId = deviceId;
     }
   }
+
+  private static void printTVList(TVList tvList, TSDataType dataType) {
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < tvList.size(); i++) {
+      long time = tvList.getTime(i);
+
+      // skip duplicated data
+      if ((i + 1 < tvList.size() && (time == tvList.getTime(i + 1)))) {
+        continue;
+      }
+
+      switch (dataType) {
+        case BOOLEAN:
+          builder.append("{").append(time).append(",").append(tvList.getBoolean(i)).append("}");
+          break;
+        case INT32:
+          builder.append("{").append(time).append(",").append(tvList.getInt(i)).append("}");
+          break;
+        case INT64:
+          builder.append("{").append(time).append(",").append(tvList.getLong(i)).append("}");
+          break;
+        case FLOAT:
+          builder.append("{").append(time).append(",").append(tvList.getFloat(i)).append("}");
+          break;
+        case DOUBLE:
+          builder.append("{").append(time).append(",").append(tvList.getDouble(i)).append("}");
+          break;
+        case TEXT:
+          builder.append("{").append(time).append(",").append(tvList.getBinary(i)).append("}");
+          break;
+        default:
+          LOGGER.error("does not support data type: {}", dataType);
+          break;
+      }
+    }
+    LOGGER.error("TVList is: {}", builder);
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
index 178a0a5..bfbbaaf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
@@ -31,7 +31,7 @@ import java.nio.ByteBuffer;
 public class PageHeader {
 
   private int uncompressedSize;
-  private int compressedSize;
+  public int compressedSize;
   private Statistics statistics;
   private boolean modified;
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index d44ee88..8f9611e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
@@ -40,6 +42,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Collections;
 
 public class ChunkWriterImpl implements IChunkWriter {
 
@@ -308,7 +312,23 @@ public class ChunkWriterImpl implements IChunkWriter {
 
       // update statistics of this chunk
       numOfPages++;
+      if (this.statistics.getStartTime() == 0) {
+        logger.error(
+            "before merge statistics' startTime is 0: {}, pageWriter statistics: {}",
+            this.statistics,
+            pageWriter.getStatistics());
+        throw new IllegalStateException("before merge statistics' startTime is 0");
+      }
+
       this.statistics.mergeStatistics(pageWriter.getStatistics());
+
+      if (this.statistics.getStartTime() == 0) {
+        logger.error(
+            "after merge statistics' startTime is 0: {}, pageWriter statistics: {}",
+            this.statistics,
+            pageWriter.getStatistics());
+        throw new IllegalStateException("after merge statistics' startTime is 0");
+      }
     } catch (IOException e) {
       logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
     } finally {
@@ -449,6 +469,34 @@ public class ChunkWriterImpl implements IChunkWriter {
     // write all pages of this column
     writer.writeBytesToStream(pageBuffer);
 
+    ChunkHeader header =
+        new ChunkHeader(
+            measurementSchema.getMeasurementId(),
+            pageBuffer.size(),
+            measurementSchema.getType(),
+            compressor.getType(),
+            measurementSchema.getEncodingType(),
+            numOfPages);
+    try {
+      Chunk chunk =
+          new Chunk(
+              header,
+              ByteBuffer.wrap(pageBuffer.toByteArray()),
+              Collections.emptyList(),
+              statistics);
+      ChunkReader chunkReader = new ChunkReader(chunk, null);
+      while (chunkReader.hasNextSatisfiedPage()) {
+        chunkReader.nextPageData();
+      }
+    } catch (Throwable e) {
+      logger.error(
+          "Verify chunk failed, chunk statistics: {}, chunk header: {}, chunk data: {}",
+          this.statistics,
+          header,
+          Arrays.toString(pageBuffer.toByteArray()));
+      throw new IllegalStateException("Verify chunk failed");
+    }
+
     int dataSize = (int) (writer.getPos() - dataOffset);
     if (dataSize != pageBuffer.size()) {
       throw new IOException(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
index afa41c5..696d799 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
@@ -247,7 +247,9 @@ public class PageWriter {
         channel.write(pageData);
       }
     } else {
+      int previousPosition = pageBuffer.size();
       pageBuffer.write(compressedBytes, 0, compressedSize);
+      if (pageBuffer.size() - previousPosition == compressedSize) {}
     }
     logger.trace("start to flush a page data into buffer, buffer position {} ", pageBuffer.size());
     return sizeWithoutStatistic;