You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/03 03:00:30 UTC

[iotdb] branch DMB created (now 17111a1)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch DMB
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 17111a1  add check in flush

This branch includes the following new commits:

     new 17111a1  add check in flush

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: add check in flush

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DMB
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 17111a1397d9ad87346ccad19b95dc751f174e90
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Mar 3 10:59:25 2022 +0800

    add check in flush
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 68 ++++++++++++++++++++--
 .../iotdb/tsfile/file/header/PageHeader.java       |  2 +-
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  | 48 +++++++++++++++
 .../apache/iotdb/tsfile/write/page/PageWriter.java |  2 +
 4 files changed, 113 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 9397af1..829cf11 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -236,11 +236,20 @@ public class MemTableFlushTask {
               Pair<TVList, MeasurementSchema> encodingMessage =
                   (Pair<TVList, MeasurementSchema>) task;
               IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
-              writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
+              try {
+                writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
+              } catch (IllegalStateException e) {
+                LOGGER.error(
+                    "IllegalStateException in encoding stage, dataType is: {}",
+                    encodingMessage.right.getType(),
+                    e);
+                printTVList(encodingMessage.left, encodingMessage.right.getType());
+                throw e;
+              }
               seriesWriter.sealCurrentPage();
               seriesWriter.clearPageWriter();
               try {
-                ioTaskQueue.put(seriesWriter);
+                ioTaskQueue.put(new Pair<>((ChunkWriterImpl) seriesWriter, encodingMessage.left));
               } catch (InterruptedException e) {
                 LOGGER.error("Put task into ioTaskQueue Interrupted");
                 Thread.currentThread().interrupt();
@@ -285,13 +294,23 @@ public class MemTableFlushTask {
               this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
             } else if (ioMessage instanceof TaskEnd) {
               break;
-            } else if (ioMessage instanceof IChunkWriter) {
-              ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
-              chunkWriter.writeToFileWriter(this.writer);
-            } else {
+            } else if (ioMessage instanceof EndChunkGroupIoTask) {
               this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
               this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
               this.writer.endChunkGroup();
+            } else {
+              Pair<ChunkWriterImpl, TVList> pair = (Pair<ChunkWriterImpl, TVList>) ioMessage;
+              ChunkWriterImpl chunkWriter = pair.left;
+              try {
+                chunkWriter.writeToFileWriter(this.writer);
+              } catch (IllegalStateException e) {
+                LOGGER.error(
+                    "IllegalStateException in io stage, dataType is: {}",
+                    chunkWriter.getDataType(),
+                    e);
+                printTVList(pair.right, chunkWriter.getDataType());
+                throw e;
+              }
             }
           } catch (IOException e) {
             LOGGER.error(
@@ -325,4 +344,41 @@ public class MemTableFlushTask {
       this.deviceId = deviceId;
     }
   }
+
+  private static void printTVList(TVList tvList, TSDataType dataType) {
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < tvList.size(); i++) {
+      long time = tvList.getTime(i);
+
+      // skip duplicated data
+      if ((i + 1 < tvList.size() && (time == tvList.getTime(i + 1)))) {
+        continue;
+      }
+
+      switch (dataType) {
+        case BOOLEAN:
+          builder.append("{").append(time).append(",").append(tvList.getBoolean(i)).append("}");
+          break;
+        case INT32:
+          builder.append("{").append(time).append(",").append(tvList.getInt(i)).append("}");
+          break;
+        case INT64:
+          builder.append("{").append(time).append(",").append(tvList.getLong(i)).append("}");
+          break;
+        case FLOAT:
+          builder.append("{").append(time).append(",").append(tvList.getFloat(i)).append("}");
+          break;
+        case DOUBLE:
+          builder.append("{").append(time).append(",").append(tvList.getDouble(i)).append("}");
+          break;
+        case TEXT:
+          builder.append("{").append(time).append(",").append(tvList.getBinary(i)).append("}");
+          break;
+        default:
+          LOGGER.error("does not support data type: {}", dataType);
+          break;
+      }
+    }
+    LOGGER.error("TVList is: {}", builder);
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
index 178a0a5..bfbbaaf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
@@ -31,7 +31,7 @@ import java.nio.ByteBuffer;
 public class PageHeader {
 
   private int uncompressedSize;
-  private int compressedSize;
+  public int compressedSize;
   private Statistics statistics;
   private boolean modified;
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index d44ee88..8f9611e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
@@ -40,6 +42,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Collections;
 
 public class ChunkWriterImpl implements IChunkWriter {
 
@@ -308,7 +312,23 @@ public class ChunkWriterImpl implements IChunkWriter {
 
       // update statistics of this chunk
       numOfPages++;
+      if (this.statistics.getStartTime() == 0) {
+        logger.error(
+            "before merge statistics' startTime is 0: {}, pageWriter statistics: {}",
+            this.statistics,
+            pageWriter.getStatistics());
+        throw new IllegalStateException("before merge statistics' startTime is 0");
+      }
+
       this.statistics.mergeStatistics(pageWriter.getStatistics());
+
+      if (this.statistics.getStartTime() == 0) {
+        logger.error(
+            "after merge statistics' startTime is 0: {}, pageWriter statistics: {}",
+            this.statistics,
+            pageWriter.getStatistics());
+        throw new IllegalStateException("after merge statistics' startTime is 0");
+      }
     } catch (IOException e) {
       logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
     } finally {
@@ -449,6 +469,34 @@ public class ChunkWriterImpl implements IChunkWriter {
     // write all pages of this column
     writer.writeBytesToStream(pageBuffer);
 
+    ChunkHeader header =
+        new ChunkHeader(
+            measurementSchema.getMeasurementId(),
+            pageBuffer.size(),
+            measurementSchema.getType(),
+            compressor.getType(),
+            measurementSchema.getEncodingType(),
+            numOfPages);
+    try {
+      Chunk chunk =
+          new Chunk(
+              header,
+              ByteBuffer.wrap(pageBuffer.toByteArray()),
+              Collections.emptyList(),
+              statistics);
+      ChunkReader chunkReader = new ChunkReader(chunk, null);
+      while (chunkReader.hasNextSatisfiedPage()) {
+        chunkReader.nextPageData();
+      }
+    } catch (Throwable e) {
+      logger.error(
+          "Verify chunk failed, chunk statistics: {}, chunk header: {}, chunk data: {}",
+          this.statistics,
+          header,
+          Arrays.toString(pageBuffer.toByteArray()));
+      throw new IllegalStateException("Verify chunk failed");
+    }
+
     int dataSize = (int) (writer.getPos() - dataOffset);
     if (dataSize != pageBuffer.size()) {
       throw new IOException(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
index afa41c5..696d799 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
@@ -247,7 +247,9 @@ public class PageWriter {
         channel.write(pageData);
       }
     } else {
+      int previousPosition = pageBuffer.size();
       pageBuffer.write(compressedBytes, 0, compressedSize);
+      if (pageBuffer.size() - previousPosition == compressedSize) {}
     }
     logger.trace("start to flush a page data into buffer, buffer position {} ", pageBuffer.size());
     return sizeWithoutStatistic;