You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/09 05:37:56 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12][ISSUE-3378] Fix NPE when clear upgrade folder; Fix some upgraded pageHeader missing statistics (#3376)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 33ee473  [To rel/0.12][ISSUE-3378] Fix NPE when clear upgrade folder; Fix some upgraded pageHeader missing statistics (#3376)
33ee473 is described below

commit 33ee473d1f2476180b4b6207fc1d25ef73205a43
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Jun 9 13:37:17 2021 +0800

    [To rel/0.12][ISSUE-3378] Fix NPE when clear upgrade folder; Fix some upgraded pageHeader missing statistics (#3376)
---
 .../engine/storagegroup/StorageGroupProcessor.java |  3 +++
 .../iotdb/db/engine/upgrade/UpgradeTask.java       |  7 ++++-
 .../apache/iotdb/db/tools/TsFileRewriteTool.java   | 12 +++------
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  | 31 ++++++++++++++++------
 4 files changed, 35 insertions(+), 18 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b2d4b60..ff626d6 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2101,6 +2101,7 @@ public class StorageGroupProcessor {
       return;
     }
     for (TsFileResource resource : resources) {
+      resource.writeLock();
       try {
         UpgradeUtils.moveUpgradedFiles(resource);
         tsFileManagement.addAll(resource.getUpgradedResources(), isseq);
@@ -2114,6 +2115,8 @@ public class StorageGroupProcessor {
             resource.getTsFile().getAbsolutePath() + "," + UpgradeCheckStatus.UPGRADE_SUCCESS);
       } catch (IOException e) {
         logger.error("Unable to load {}, caused by ", resource, e);
+      } finally {
+        resource.writeUnlock();
       }
     }
     // delete upgrade folder when it is empty
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
index 76b1bac..bcf29d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
@@ -139,8 +139,13 @@ public class UpgradeTask extends WrappedRunnable {
         }
         File virtualStorageGroupDir = fsFactory.getFile(storageGroup, "0");
         File upgradeDir = fsFactory.getFile(virtualStorageGroupDir, "upgrade");
-
+        if (upgradeDir == null) {
+          continue;
+        }
         File[] tmpPartitionDirList = upgradeDir.listFiles();
+        if (tmpPartitionDirList == null) {
+          continue;
+        }
         for (File tmpPartitionDir : tmpPartitionDirList) {
           if (tmpPartitionDir.isDirectory()) {
             try {
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index 955ed77..e1e4bc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -319,17 +319,12 @@ public class TsFileRewriteTool implements AutoCloseable {
       List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i);
       List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i);
       valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
-      boolean isOnlyOnePageChunk = pageDataInChunk.size() == 1;
       for (int j = 0; j < pageDataInChunk.size(); j++) {
         if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) {
           decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup);
         } else {
           writePageInToFile(
-              schema,
-              pageHeadersInChunk.get(j),
-              pageDataInChunk.get(j),
-              chunkWritersInChunkGroup,
-              isOnlyOnePageChunk);
+              schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j), chunkWritersInChunkGroup);
         }
       }
     }
@@ -389,15 +384,14 @@ public class TsFileRewriteTool implements AutoCloseable {
       MeasurementSchema schema,
       PageHeader pageHeader,
       ByteBuffer pageData,
-      Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup,
-      boolean isOnlyOnePageChunk)
+      Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
       throws PageException {
     long partitionId = StorageEngine.getTimePartition(pageHeader.getStartTime());
     getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
     Map<MeasurementSchema, ChunkWriterImpl> chunkWriters =
         chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>());
     ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
-    chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader, isOnlyOnePageChunk);
+    chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader);
     chunkWriters.put(schema, chunkWriter);
     chunkWritersInChunkGroup.put(partitionId, chunkWriters);
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index 39e5f79..48ff92c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -294,8 +294,7 @@ public class ChunkWriterImpl implements IChunkWriter {
       if (numOfPages == 0) { // record the firstPageStatistics
         this.firstPageStatistics = pageWriter.getStatistics();
         this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
-      } else if (numOfPages == 1
-          && firstPageStatistics != null) { // put the firstPageStatistics into pageBuffer
+      } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
         byte[] b = pageBuffer.toByteArray();
         pageBuffer.reset();
         pageBuffer.write(b, 0, this.sizeWithoutStatistic);
@@ -374,16 +373,32 @@ public class ChunkWriterImpl implements IChunkWriter {
    * write the page header and data into the PageWriter's output stream. @NOTE: for upgrading
    * 0.11/v2 to 0.12/v3 TsFile
    */
-  public void writePageHeaderAndDataIntoBuff(
-      ByteBuffer data, PageHeader header, boolean isOnlyOnePageChunk) throws PageException {
-
+  public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
+      throws PageException {
     // write the page header to pageBuffer
     try {
       logger.debug(
           "start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
-      ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
-      ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
-      if (!isOnlyOnePageChunk) {
+      // serialize pageHeader  see writePageToPageBuffer method
+      if (numOfPages == 0) { // record the firstPageStatistics
+        this.firstPageStatistics = header.getStatistics();
+        this.sizeWithoutStatistic +=
+            ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+        this.sizeWithoutStatistic +=
+            ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+      } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+        byte[] b = pageBuffer.toByteArray();
+        pageBuffer.reset();
+        pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+        firstPageStatistics.serialize(pageBuffer);
+        pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+        header.getStatistics().serialize(pageBuffer);
+        firstPageStatistics = null;
+      } else {
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
         header.getStatistics().serialize(pageBuffer);
       }
       logger.debug(