You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/11/11 03:10:21 UTC

[iotdb] branch rel/0.11 updated: [To rel/0.11] [IOTDB-996] Fix the recovery Bug (#2002)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 06e78a7  [To rel/0.11] [IOTDB-996] Fix the recovery Bug (#2002)
06e78a7 is described below

commit 06e78a7e8235c605e5a89a63a8c4fc84353a83d9
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Wed Nov 11 11:07:36 2020 +0800

    [To rel/0.11] [IOTDB-996] Fix the recovery Bug (#2002)
---
 .../level/LevelCompactionTsFileManagement.java     |  3 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 88 +++++++++++++++-------
 2 files changed, 61 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 3d36f06..215f86a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -41,11 +41,11 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -60,6 +60,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
   private static final Logger logger = LoggerFactory
       .getLogger(LevelCompactionTsFileManagement.class);
 
+
   private final int seqLevelNum = IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum();
   private final int seqFileNumInEachLevel = IoTDBDescriptor.getInstance().getConfig()
       .getSeqFileNumInEachLevel();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2b49583..90ad8cc 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -48,6 +48,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.CloseFileListener;
@@ -58,8 +60,6 @@ import org.apache.iotdb.db.engine.merge.task.RecoverMergeTask;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
-import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.BatchInsertionException;
@@ -86,6 +86,7 @@ import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
@@ -121,6 +122,7 @@ public class StorageGroupProcessor {
 
   public static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods";
   private static final String FAIL_TO_UPGRADE_FOLDER = "Failed to move {} to upgrade folder";
+  private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
 
   /**
    * All newly generated chunks after merge have version number 0, so we set merged Modification
@@ -557,12 +559,15 @@ public class StorageGroupProcessor {
 
   private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq) {
     for (int i = 0; i < tsFiles.size(); i++) {
-      if (LevelCompactionTsFileManagement.getMergeLevel(tsFiles.get(i).getTsFile()) > 0) {
+      TsFileResource tsFileResource = tsFiles.get(i);
+      // this tsfile is not zero level, no need to perform recovery here
+      if (LevelCompactionTsFileManagement.getMergeLevel(tsFileResource.getTsFile()) > 0) {
+        tsFileResource.setClosed(true);
+        tsFileManagement.add(tsFileResource, isSeq);
         continue;
       }
-      TsFileResource tsFileResource = tsFiles.get(i);
-      long timePartitionId = tsFileResource.getTimePartition();
 
+      long timePartitionId = tsFileResource.getTimePartition();
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(
           storageGroupName + FILE_NAME_SEPARATOR,
           getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, isSeq,
@@ -619,7 +624,7 @@ public class StorageGroupProcessor {
           long chunkMetadataSize = 0;
           for (Map<String, List<ChunkMetadata>> metaMap : writer.getMetadatasForQuery().values()) {
             for (List<ChunkMetadata> metadatas : metaMap.values()) {
-              for (ChunkMetadata chunkMetadata: metadatas) {
+              for (ChunkMetadata chunkMetadata : metadatas) {
                 chunkMetadataSize += chunkMetadata.calculateRamSize();
               }
             }
@@ -790,11 +795,11 @@ public class StorageGroupProcessor {
    * inserted are in the range [start, end)
    *
    * @param insertTabletPlan insert a tablet of a device
-   * @param sequence whether is sequence
-   * @param start start index of rows to be inserted in insertTabletPlan
-   * @param end end index of rows to be inserted in insertTabletPlan
-   * @param results result array
-   * @param timePartitionId time partition id
+   * @param sequence         whether is sequence
+   * @param start            start index of rows to be inserted in insertTabletPlan
+   * @param end              end index of rows to be inserted in insertTabletPlan
+   * @param results          result array
+   * @param timePartitionId  time partition id
    * @return false if any failure occurs when inserting the tablet, true otherwise
    */
   private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
@@ -916,7 +921,7 @@ public class StorageGroupProcessor {
   public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) {
     writeLock();
     try {
-      if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) && 
+      if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) &&
           !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
         fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
       }
@@ -952,9 +957,9 @@ public class StorageGroupProcessor {
   /**
    * get processor from hashmap, flush oldest processor if necessary
    *
-   * @param timeRangeId time partition range
+   * @param timeRangeId            time partition range
    * @param tsFileProcessorTreeMap tsFileProcessorTreeMap
-   * @param sequence whether is sequence or not
+   * @param sequence               whether is sequence or not
    */
   private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
       TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1092,7 +1097,7 @@ public class StorageGroupProcessor {
   public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
     //for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
     //for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
-    if (closingSequenceTsFileProcessor.contains(tsFileProcessor) || 
+    if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
         closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
       return;
     }
@@ -1383,6 +1388,13 @@ public class StorageGroupProcessor {
       boolean isSeq)
       throws MetadataException {
 
+    if (config.isDebugOn()) {
+      DEBUG_LOGGER.info(
+          "Path: " + deviceId.getFullPath() + TsFileConstant.PATH_SEPARATOR + measurementId
+              + ", get tsfile list: " + tsFileResources + " isSeq: " + isSeq + " timefilter: " + (
+              timeFilter == null ? "null" : timeFilter.toString()));
+    }
+
     MeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(deviceId, measurementId);
 
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
@@ -1431,6 +1443,10 @@ public class StorageGroupProcessor {
   private boolean isTsFileResourceSatisfied(TsFileResource tsFileResource, String deviceId,
       Filter timeFilter, boolean isSeq) {
     if (!tsFileResource.containsDevice(deviceId)) {
+      if (config.isDebugOn()) {
+        DEBUG_LOGGER.info("Path: " + deviceId + " file " + tsFileResource
+            + " is not satisfied because of no device!");
+      }
       return false;
     }
 
@@ -1440,11 +1456,20 @@ public class StorageGroupProcessor {
         : Long.MAX_VALUE;
 
     if (!isAlive(endTime)) {
+      if (config.isDebugOn()) {
+        DEBUG_LOGGER.info("Path: " + deviceId + " file " + tsFileResource
+            + " is not satisfied because of ttl!");
+      }
       return false;
     }
 
     if (timeFilter != null) {
-      return timeFilter.satisfyStartEndTime(startTime, endTime);
+      boolean res = timeFilter.satisfyStartEndTime(startTime, endTime);
+      if (config.isDebugOn() && !res) {
+        DEBUG_LOGGER.info("Path: " + deviceId + " file " + tsFileResource
+            + " is not satisfied because of time filter!");
+      }
+      return res;
     }
     return true;
   }
@@ -1452,12 +1477,14 @@ public class StorageGroupProcessor {
   /**
    * Delete data whose timestamp <= 'timestamp' and belongs to the time series
    * deviceId.measurementId.
-   *  @param path the timeseries path of the to be deleted.
+   *
+   * @param path      the timeseries path of the to be deleted.
    * @param startTime the startTime of delete range.
-   * @param endTime the endTime of delete range.
+   * @param endTime   the endTime of delete range.
    * @param planIndex
    */
-  public void delete(PartialPath path, long startTime, long endTime, long planIndex) throws IOException {
+  public void delete(PartialPath path, long startTime, long endTime, long planIndex)
+      throws IOException {
     // TODO: how to avoid partial deletion?
     // FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened
     //mod files in mergingModification, sequenceFileList, and unsequenceFileList
@@ -1535,11 +1562,12 @@ public class StorageGroupProcessor {
   }
 
   private boolean canSkipDelete(TsFileResource tsFileResource, Set<PartialPath> devicePaths,
-       long deleteStart, long deleteEnd) {
+      long deleteStart, long deleteEnd) {
     for (PartialPath device : devicePaths) {
       if (tsFileResource.containsDevice(device.getFullPath()) &&
-              (deleteEnd >= tsFileResource.getStartTime(device.getFullPath()) &&
-               deleteStart <= tsFileResource.getOrDefaultEndTime(device.getFullPath(), Long.MAX_VALUE))) {
+          (deleteEnd >= tsFileResource.getStartTime(device.getFullPath()) &&
+              deleteStart <= tsFileResource
+                  .getOrDefaultEndTime(device.getFullPath(), Long.MAX_VALUE))) {
         return false;
       }
     }
@@ -1548,9 +1576,10 @@ public class StorageGroupProcessor {
 
   private void deleteDataInFiles(Collection<TsFileResource> tsFileResourceList, Deletion deletion,
       Set<PartialPath> devicePaths, List<ModificationFile> updatedModFiles, long planIndex)
-          throws IOException {
+      throws IOException {
     for (TsFileResource tsFileResource : tsFileResourceList) {
-      if (canSkipDelete(tsFileResource, devicePaths, deletion.getStartTime(), deletion.getEndTime())) {
+      if (canSkipDelete(tsFileResource, devicePaths, deletion.getStartTime(),
+          deletion.getEndTime())) {
         continue;
       }
 
@@ -1584,7 +1613,8 @@ public class StorageGroupProcessor {
       MNode node = IoTDB.metaManager.getDeviceNode(deviceId);
 
       for (MNode measurementNode : node.getChildren().values()) {
-        if (measurementNode != null && originalPath.matchFullPath(measurementNode.getPartialPath())) {
+        if (measurementNode != null && originalPath
+            .matchFullPath(measurementNode.getPartialPath())) {
           TimeValuePair lastPair = ((MeasurementMNode) measurementNode).getCachedLast();
           if (lastPair != null && startTime <= lastPair.getTimestamp()
               && lastPair.getTimestamp() <= endTime) {
@@ -2075,9 +2105,9 @@ public class StorageGroupProcessor {
    * returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
    * version number is the version number in the tsfile with a larger timestamp.
    *
-   * @param tsfileName origin tsfile name
+   * @param tsfileName  origin tsfile name
    * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
-   * 1]
+   *                    1]
    * @return appropriate filename
    */
   private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2141,8 +2171,8 @@ public class StorageGroupProcessor {
   /**
    * Execute the loading process by the type.
    *
-   * @param type load type
-   * @param tsFileResource tsfile resource to be loaded
+   * @param type            load type
+   * @param tsFileResource  tsfile resource to be loaded
    * @param filePartitionId the partition id of the new file
    * @return load the file successfully
    * @UsedBy sync module, load external tsfile module.