You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/08/06 05:06:06 UTC

[iotdb] branch master updated: Cherry-pick 4 bug-fixing commits about upgrade tool from rel/0.12 to master branch (#3698)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3234abf  Cherry-pick 4 bug-fixing commits about upgrade tool from rel/0.12 to master branch (#3698)
3234abf is described below

commit 3234abf97f1f7900eb7da2a3d68c1f5ceeb91bde
Author: Haonan <hh...@outlook.com>
AuthorDate: Fri Aug 6 13:05:45 2021 +0800

    Cherry-pick 4 bug-fixing commits about upgrade tool from rel/0.12 to master branch (#3698)
    
    * Fix upgrade tool cannot close file reader (#3319)
    
    * [To rel/0.12] Fix removing tmp folders logic in upgrade tool (#3186)
    
    * [To rel/0.12] Fix upgrade NPE and DeadLock (#3329)
    
    * Fix upgrade NPE and DeadLock
    
    * Fix get ungrade file num deadlock
    
    * [ISSUE-3378] Fix NPE when clear upgrade folder; Fix some upgraded pageHeader missing statistics (#3376)
---
 .../engine/storagegroup/StorageGroupProcessor.java |  3 ++
 .../iotdb/db/engine/upgrade/UpgradeTask.java       | 60 ++++++++++++++++++----
 .../org/apache/iotdb/db/service/UpgradeSevice.java | 24 ++-------
 .../apache/iotdb/db/tools/TsFileRewriteTool.java   | 30 +++++++----
 .../db/tools/upgrade/TsFileOnlineUpgradeTool.java  |  4 +-
 .../org/apache/iotdb/db/utils/UpgradeUtils.java    | 10 ----
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  | 28 +++++++---
 7 files changed, 102 insertions(+), 57 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index af54491..77d68b1 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2180,6 +2180,7 @@ public class StorageGroupProcessor {
       return;
     }
     for (TsFileResource resource : resources) {
+      resource.writeLock();
       try {
         UpgradeUtils.moveUpgradedFiles(resource);
         tsFileManagement.addAll(resource.getUpgradedResources(), isseq);
@@ -2193,6 +2194,8 @@ public class StorageGroupProcessor {
             resource.getTsFile().getAbsolutePath() + "," + UpgradeCheckStatus.UPGRADE_SUCCESS);
       } catch (IOException e) {
         logger.error("Unable to load {}, caused by ", resource, e);
+      } finally {
+        resource.writeUnlock();
       }
     }
     // delete upgrade folder when it is empty
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
index a2e8357..bcf29d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.upgrade;
 
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.service.UpgradeSevice;
 import org.apache.iotdb.db.tools.upgrade.TsFileOnlineUpgradeTool;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -59,19 +61,17 @@ public class UpgradeTask extends WrappedRunnable {
         logger.info("find upgraded file for {}", upgradeResource.getTsFile());
         upgradedResources = findUpgradedFiles();
       }
-      upgradeResource.writeLock();
-      try {
-        upgradeResource.setUpgradedResources(upgradedResources);
-        upgradeResource.getUpgradeTsFileResourceCallBack().call(upgradeResource);
-      } finally {
-        upgradeResource.writeUnlock();
-      }
-      UpgradeSevice.setCntUpgradeFileNum(UpgradeSevice.getCntUpgradeFileNum() - 1);
+      upgradeResource.setUpgradedResources(upgradedResources);
+      upgradeResource.getUpgradeTsFileResourceCallBack().call(upgradeResource);
+      UpgradeSevice.getTotalUpgradeFileNum().getAndAdd(-1);
       logger.info(
           "Upgrade completes, file path:{} , the remaining upgraded file num: {}",
           oldTsfilePath,
-          UpgradeSevice.getCntUpgradeFileNum());
-      if (UpgradeSevice.getCntUpgradeFileNum() == 0) {
+          UpgradeSevice.getTotalUpgradeFileNum().get());
+      if (UpgradeSevice.getTotalUpgradeFileNum().get() == 0) {
+        logger.info("Start delete empty tmp folders");
+        clearTmpFolders(DirectoryManager.getInstance().getAllSequenceFileFolders());
+        clearTmpFolders(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
         UpgradeSevice.getINSTANCE().stop();
         logger.info("All files upgraded successfully! ");
       }
@@ -126,4 +126,44 @@ public class UpgradeTask extends WrappedRunnable {
     }
     return upgradedResources;
   }
+
+  private void clearTmpFolders(List<String> folders) {
+    for (String baseDir : folders) {
+      File fileFolder = fsFactory.getFile(baseDir);
+      if (!fileFolder.isDirectory()) {
+        continue;
+      }
+      for (File storageGroup : fileFolder.listFiles()) {
+        if (!storageGroup.isDirectory()) {
+          continue;
+        }
+        File virtualStorageGroupDir = fsFactory.getFile(storageGroup, "0");
+        File upgradeDir = fsFactory.getFile(virtualStorageGroupDir, "upgrade");
+        if (upgradeDir == null) {
+          continue;
+        }
+        File[] tmpPartitionDirList = upgradeDir.listFiles();
+        if (tmpPartitionDirList == null) {
+          continue;
+        }
+        for (File tmpPartitionDir : tmpPartitionDirList) {
+          if (tmpPartitionDir.isDirectory()) {
+            try {
+              Files.delete(tmpPartitionDir.toPath());
+            } catch (IOException e) {
+              logger.error("Delete tmpPartitionDir {} failed", tmpPartitionDir);
+            }
+          }
+        }
+        // delete upgrade folder when it is empty
+        if (upgradeDir.isDirectory()) {
+          try {
+            Files.delete(upgradeDir.toPath());
+          } catch (IOException e) {
+            logger.error("Delete tmpUpgradeDir {} failed", upgradeDir);
+          }
+        }
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/UpgradeSevice.java b/server/src/main/java/org/apache/iotdb/db/service/UpgradeSevice.java
index 4468208..faa3c39 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/UpgradeSevice.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/UpgradeSevice.java
@@ -38,7 +38,7 @@ public class UpgradeSevice implements IService {
 
   private ExecutorService upgradeThreadPool;
   private AtomicInteger threadCnt = new AtomicInteger();
-  private static int cntUpgradeFileNum;
+  private static AtomicInteger cntUpgradeFileNum = new AtomicInteger();
 
   private UpgradeSevice() {}
 
@@ -63,7 +63,7 @@ public class UpgradeSevice implements IService {
             updateThreadNum, r -> new Thread(r, "UpgradeThread-" + threadCnt.getAndIncrement()));
     UpgradeLog.createUpgradeLog();
     countUpgradeFiles();
-    if (cntUpgradeFileNum == 0) {
+    if (cntUpgradeFileNum.get() == 0) {
       stop();
       return;
     }
@@ -87,22 +87,8 @@ public class UpgradeSevice implements IService {
     return ServiceType.UPGRADE_SERVICE;
   }
 
-  public static void setCntUpgradeFileNum(int cntUpgradeFileNum) {
-    UpgradeUtils.getCntUpgradeFileLock().writeLock().lock();
-    try {
-      UpgradeSevice.cntUpgradeFileNum = cntUpgradeFileNum;
-    } finally {
-      UpgradeUtils.getCntUpgradeFileLock().writeLock().unlock();
-    }
-  }
-
-  public static int getCntUpgradeFileNum() {
-    UpgradeUtils.getCntUpgradeFileLock().readLock().lock();
-    try {
-      return cntUpgradeFileNum;
-    } finally {
-      UpgradeUtils.getCntUpgradeFileLock().readLock().unlock();
-    }
+  public static AtomicInteger getTotalUpgradeFileNum() {
+    return cntUpgradeFileNum;
   }
 
   public void submitUpgradeTask(UpgradeTask upgradeTask) {
@@ -110,7 +96,7 @@ public class UpgradeSevice implements IService {
   }
 
   private static void countUpgradeFiles() {
-    cntUpgradeFileNum = StorageEngine.getInstance().countUpgradeFiles();
+    cntUpgradeFileNum.addAndGet(StorageEngine.getInstance().countUpgradeFiles());
     logger.info("finish counting upgrading files, total num:{}", cntUpgradeFileNum);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index b356dd3..c52457b 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.reader.page.PageReader;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -107,6 +108,23 @@ public class TsFileRewriteTool implements AutoCloseable {
     }
   }
 
+  public TsFileRewriteTool(TsFileResource resourceToBeRewritten, boolean needReaderForV2)
+      throws IOException {
+    oldTsFile = resourceToBeRewritten.getTsFile();
+    String file = oldTsFile.getAbsolutePath();
+    if (needReaderForV2) {
+      reader = new TsFileSequenceReaderForV2(file);
+    } else {
+      reader = new TsFileSequenceReader(file);
+    }
+    partitionWriterMap = new HashMap<>();
+    if (FSFactoryProducer.getFSFactory().getFile(file + ModificationFile.FILE_SUFFIX).exists()) {
+      oldModification = (List<Modification>) resourceToBeRewritten.getModFile().getModifications();
+      modsIterator = oldModification.iterator();
+      fileModificationMap = new HashMap<>();
+    }
+  }
+
   /**
    * Rewrite an old file to the latest version
    *
@@ -302,17 +320,12 @@ public class TsFileRewriteTool implements AutoCloseable {
       List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i);
       List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i);
       valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
-      boolean isOnlyOnePageChunk = pageDataInChunk.size() == 1;
       for (int j = 0; j < pageDataInChunk.size(); j++) {
         if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) {
           decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup);
         } else {
           writePageInToFile(
-              schema,
-              pageHeadersInChunk.get(j),
-              pageDataInChunk.get(j),
-              chunkWritersInChunkGroup,
-              isOnlyOnePageChunk);
+              schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j), chunkWritersInChunkGroup);
         }
       }
     }
@@ -372,15 +385,14 @@ public class TsFileRewriteTool implements AutoCloseable {
       IMeasurementSchema schema,
       PageHeader pageHeader,
       ByteBuffer pageData,
-      Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup,
-      boolean isOnlyOnePageChunk)
+      Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
       throws PageException {
     long partitionId = StorageEngine.getTimePartition(pageHeader.getStartTime());
     getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
     Map<IMeasurementSchema, ChunkWriterImpl> chunkWriters =
         chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>());
     ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
-    chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader, isOnlyOnePageChunk);
+    chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader);
     chunkWriters.put(schema, chunkWriter);
     chunkWritersInChunkGroup.put(partitionId, chunkWriters);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index 6d0192c..8b04bba 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -62,9 +62,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
    * @throws IOException If some I/O error occurs
    */
   public TsFileOnlineUpgradeTool(TsFileResource resourceToBeUpgraded) throws IOException {
-    super(resourceToBeUpgraded);
-    String file = oldTsFile.getAbsolutePath();
-    reader = new TsFileSequenceReaderForV2(file);
+    super(resourceToBeUpgraded, true);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java
index 87dd910..f8e2b82 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java
@@ -122,16 +122,6 @@ public class UpgradeUtils {
       upgradedResource.serialize();
       // delete generated temp resource file
       Files.delete(tempResourceFile.toPath());
-      // delete tmp partition folder when it is empty
-      File tmpPartitionDir = upgradedFile.getParentFile();
-      if (tmpPartitionDir.isDirectory() && tmpPartitionDir.listFiles().length == 0) {
-        Files.delete(tmpPartitionDir.toPath());
-      }
-      // delete upgrade folder when it is empty
-      File upgradeDir = tmpPartitionDir.getParentFile();
-      if (upgradeDir.isDirectory() && upgradeDir.listFiles().length == 0) {
-        Files.delete(upgradeDir.toPath());
-      }
     }
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index 078df52..e66ed69 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -378,16 +378,32 @@ public class ChunkWriterImpl implements IChunkWriter {
    * write the page header and data into the PageWriter's output stream. @NOTE: for upgrading
    * 0.11/v2 to 0.12/v3 TsFile
    */
-  public void writePageHeaderAndDataIntoBuff(
-      ByteBuffer data, PageHeader header, boolean isOnlyOnePageChunk) throws PageException {
-
+  public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
+      throws PageException {
     // write the page header to pageBuffer
     try {
       logger.debug(
           "start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
-      ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
-      ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
-      if (!isOnlyOnePageChunk) {
+      // serialize pageHeader  see writePageToPageBuffer method
+      if (numOfPages == 0) { // record the firstPageStatistics
+        this.firstPageStatistics = header.getStatistics();
+        this.sizeWithoutStatistic +=
+            ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+        this.sizeWithoutStatistic +=
+            ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+      } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+        byte[] b = pageBuffer.toByteArray();
+        pageBuffer.reset();
+        pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+        firstPageStatistics.serialize(pageBuffer);
+        pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+        header.getStatistics().serialize(pageBuffer);
+        firstPageStatistics = null;
+      } else {
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
         header.getStatistics().serialize(pageBuffer);
       }
       logger.debug(