You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/31 12:01:43 UTC

[incubator-iotdb] branch fix_many_bugs updated: fix partition -1 when recover

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_many_bugs
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/fix_many_bugs by this push:
     new e7c387d  fix partition -1 when recover
e7c387d is described below

commit e7c387d5ee4ba2d7afeec7be498a845f08d56882
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Mar 31 20:02:08 2020 +0800

    fix partition -1 when recover
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  3 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 45 +++++++---------------
 .../org/apache/iotdb/db/utils/FilePathUtils.java   |  5 +++
 .../writelog/recover/TsFileRecoverPerformer.java   |  4 --
 4 files changed, 20 insertions(+), 37 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 3ef7e0a..0e6641c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -498,8 +498,7 @@ public class StorageEngine implements IService {
         if (!sequenceFile.isClosed()) {
           continue;
         }
-        String[] fileSplits = FilePathUtils.splitTsFilePath(sequenceFile);
-        long partitionNum = Long.parseLong(fileSplits[fileSplits.length - 2]);
+        long partitionNum = FilePathUtils.getTsFileResourcePartition(sequenceFile);
         Map<Long, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey()
             , n -> new HashMap<>());
         storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(sequenceFile);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 97c2be9..07718a4 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -262,8 +262,7 @@ public class StorageGroupProcessor {
         if (resource.getFile().length() == 0) {
           deleteTsfile(resource.getFile());
         }
-        String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
-        long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+        long partitionNum = FilePathUtils.getTsFileResourcePartition(resource);
         partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
       }
       for (TsFileResource resource : unseqTsFiles) {
@@ -271,8 +270,7 @@ public class StorageGroupProcessor {
         if (resource.getFile().length() == 0) {
           deleteTsfile(resource.getFile());
         }
-        String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
-        long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+        long partitionNum = FilePathUtils.getTsFileResourcePartition(resource);
         partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
       }
 
@@ -296,30 +294,16 @@ public class StorageGroupProcessor {
     }
 
     for (TsFileResource resource : sequenceFileTreeSet) {
-      long timePartitionId = getTimePartitionFromTsFileResource(resource);
-      if (timePartitionId != -1) {
-        latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
-            .putAll(resource.getEndTimeMap());
-        partitionLatestFlushedTimeForEachDevice
-            .computeIfAbsent(timePartitionId, id -> new HashMap<>())
-            .putAll(resource.getEndTimeMap());
-        globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
-      }
+      long timePartitionId = FilePathUtils.getTsFileResourcePartition(resource);
+      latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
+          .putAll(resource.getEndTimeMap());
+      partitionLatestFlushedTimeForEachDevice
+          .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+          .putAll(resource.getEndTimeMap());
+      globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
     }
   }
 
-  private long getTimePartitionFromTsFileResource(TsFileResource resource) {
-    // device id -> start map
-    // if start time map is empty, tsfile resource is empty, return -1;
-    Map<String, Long> startTimeMap = resource.getStartTimeMap();
-    // just find any time of device
-    Iterator<Long> iterator = startTimeMap.values().iterator();
-    if (iterator.hasNext()) {
-      return StorageEngine.getTimePartition(iterator.next());
-    }
-
-    return -1;
-  }
 
   /**
    * get version controller by time partition Id Thread-safety should be ensure by caller
@@ -393,7 +377,7 @@ public class StorageGroupProcessor {
     for (int i = 0; i < tsFiles.size(); i++) {
       TsFileResource tsFileResource = tsFiles.get(i);
       sequenceFileTreeSet.add(tsFileResource);
-      long timePartitionId = getTimePartitionFromTsFileResource(tsFileResource);
+      long timePartitionId = FilePathUtils.getTsFileResourcePartition(tsFileResource);
 
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
           getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, false,
@@ -422,7 +406,7 @@ public class StorageGroupProcessor {
     for (int i = 0; i < tsFiles.size(); i++) {
       TsFileResource tsFileResource = tsFiles.get(i);
       unSequenceFileList.add(tsFileResource);
-      long timePartitionId = getTimePartitionFromTsFileResource(tsFileResource);
+      long timePartitionId = FilePathUtils.getTsFileResourcePartition(tsFileResource);
 
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
           getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true,
@@ -1219,7 +1203,7 @@ public class StorageGroupProcessor {
         continue;
       }
 
-      long partitionId = getTimePartitionFromTsFileResource(tsFileResource);
+      long partitionId = FilePathUtils.getTsFileResourcePartition(tsFileResource);
       deletion.setVersionNum(getVersionControllerByTimePartitionId(partitionId).nextVersion());
 
       // write deletion into modification file
@@ -1566,7 +1550,7 @@ public class StorageGroupProcessor {
         // check whether the file name needs to be renamed.
         if (!sequenceFileTreeSet.isEmpty()) {
           String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), insertPos,
-              getTimePartitionFromTsFileResource(newTsFileResource), sequenceList);
+              FilePathUtils.getTsFileResourcePartition(newTsFileResource), sequenceList);
           if (!newFileName.equals(tsfileToBeInserted.getName())) {
             logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
                 tsfileToBeInserted.getName(), newFileName);
@@ -1579,8 +1563,7 @@ public class StorageGroupProcessor {
 
       // update latest time map
       updateLatestTimeMap(newTsFileResource);
-      String[] filePathSplit = FilePathUtils.splitTsFilePath(newTsFileResource);
-      long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+      long partitionNum = FilePathUtils.getTsFileResourcePartition(newTsFileResource);
       partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
           .addAll(newTsFileResource.getHistoricalVersions());
     } catch (DiskSpaceInsufficientException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index d89e6fb..b5168b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -45,4 +45,9 @@ public class FilePathUtils {
   public static String[] splitTsFilePath(TsFileResource resource) {
     return resource.getFile().getAbsolutePath().split(PATH_SPLIT_STRING);
   }
+
+  public static long getTsFileResourcePartition(TsFileResource resource) {
+    String[] splits = splitTsFilePath(resource);
+    return Long.parseLong(splits[splits.length - 2]);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index ad22e19..1e05b9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -81,9 +81,6 @@ public class TsFileRecoverPerformer {
    */
   public RestorableTsFileIOWriter recover() throws StorageGroupProcessorException {
 
-    IMemTable recoverMemTable = new PrimitiveMemTable();
-    this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, resource.getModFile(),
-        versionController, resource, recoverMemTable, acceptUnseq);
     File insertFile = FSFactoryProducer.getFSFactory().getFile(insertFilePath);
     if (!insertFile.exists()) {
       logger.error("TsFile {} is missing, will skip its recovery.", insertFilePath);
@@ -197,7 +194,6 @@ public class TsFileRecoverPerformer {
     try {
       if (!recoverMemTable.isEmpty()) {
         // flush logs
-
         MemTableFlushTask tableFlushTask = new MemTableFlushTask(recoverMemTable,
             restorableTsFileIOWriter, resource.getFile().getParentFile().getParentFile().getName());
         tableFlushTask.syncFlushMemTable();