You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2021/02/21 13:40:20 UTC

[iotdb] branch proceeding_vldb updated: finish tired compaction

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch proceeding_vldb
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/proceeding_vldb by this push:
     new 327e1da  finish tired compaction
327e1da is described below

commit 327e1da96d61f26859b744dbbc140f53bf9be0aa
Author: EJTTianyu <16...@qq.com>
AuthorDate: Sun Feb 21 21:39:48 2021 +0800

    finish tired compaction
---
 .../tired/TiredCompactionTsFileManagement.java     | 15 +++++--
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 51 ++++++++++++++++++----
 2 files changed, 54 insertions(+), 12 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/tired/TiredCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/tired/TiredCompactionTsFileManagement.java
index f2f688b..e9146b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/tired/TiredCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/tired/TiredCompactionTsFileManagement.java
@@ -38,6 +38,8 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.metadata.MManager;
@@ -45,6 +47,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.db.utils.MergeUtils;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -343,9 +346,9 @@ public class TiredCompactionTsFileManagement extends TsFileManagement {
 
   @Override
   protected void merge(long timePartition) {
-    if (isCompactionWorking()) {
-      return;
-    }
+//    if (isCompactionWorking()) {
+//      return;
+//    }
     Map<Long, Map<Long, List<TsFileResource>>> selectFiles = selectMergeFile(timePartition);
     mergeFiles(selectFiles, timePartition);
   }
@@ -444,7 +447,7 @@ public class TiredCompactionTsFileManagement extends TsFileManagement {
       newResource.setHistoricalVersions(historicalVersions);
       newResource.serialize();
       newFileWriter.endFile();
-
+      newResource.close();
       cleanUp(resources, newResource, mergedLevel, timePartition, parentPath);
     } catch (Exception e) {
       //TODO do nothing
@@ -477,6 +480,10 @@ public class TiredCompactionTsFileManagement extends TsFileManagement {
         }
         for (TsFileResource deleteRes : res.getValue()) {
           deleteRes.delete();
+          ChunkMetadataCache.getInstance().clear();
+          TimeSeriesMetadataCache.getInstance().clear();
+          FileReaderManager.getInstance().closeFileAndRemoveReader(deleteRes.getTsFilePath());
+          deleteRes.delete();
         }
       }
       File oldFile = newTsResource.getTsFile();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 65fdbb0..28d7cbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -697,15 +697,21 @@ public class PlanExecutor implements IPlanExecutor {
 //      为了 vldb 投稿, 根据文件 version 加载数据文件
 //      数据文件夹格式 sg -- seq
 //                      -- unseq
-      loadVldbDataDir(file, plan);
+      if (IoTDBDescriptor.getInstance().getConfig().isOverlapSplit()) {
+        loadVldbDataDir(file, plan);
+      } else {
+        loadVldbDataDirNoSplit(file, plan);
+      }
     } else {
       loadFile(file, plan);
     }
   }
 
+  // 用于处理顺序,乱序分离的情况
   private void loadVldbDataDir(File curFile, OperateFilePlan plan) throws QueryProcessException{
     File[] files = curFile.listFiles();
     List<File> fileList = new ArrayList<>();
+    List<File> unseqTmpList = new ArrayList<>();
     Map<Boolean, List<File>> fileBySeq = new HashMap<>();
     fileBySeq.put(true, new ArrayList<>());
     fileBySeq.put(false, new ArrayList<>());
@@ -718,23 +724,31 @@ public class PlanExecutor implements IPlanExecutor {
     }
     for (File file : files[1].listFiles()) {
       if (file.getName().endsWith(TSFILE_SUFFIX)) {
-        fileList.add(file);
+        unseqTmpList.add(file);
         fileBySeq.get(false).add(file);
       }
     }
-    File maxFile = null;
-    if (!fileBySeq.get(true).isEmpty()) {
-       maxFile = fileList.remove(fileList.size() - 1);
-    }
     Collections.sort(fileList, new Comparator<File>() {
       @Override
       public int compare(File o1, File o2) {
         return TsFileManagement.compareFileName(o1, o2);
       }
     });
-    if (!fileBySeq.get(true).isEmpty()) {
-      fileList.add(maxFile);
+    Collections.sort(unseqTmpList, new Comparator<File>() {
+      @Override
+      public int compare(File o1, File o2) {
+        return TsFileManagement.compareFileName(o1, o2);
+      }
+    });
+    for (int i = 1; i < unseqTmpList.size(); i++){
+      for (int j =0; j < fileList.size() ; j++){
+        if (TsFileManagement.compareFileName(unseqTmpList.get(i), fileList.get(j)) == -1){
+          fileList.add(j - 1, unseqTmpList.get(i - 1));
+          break;
+        }
+      }
     }
+    fileList.add(unseqTmpList.get(unseqTmpList.size() - 1));
     for (File file : fileList) {
       if (fileBySeq.get(true).contains(file)) {
         loadFileBySeq(file, true, plan);
@@ -744,6 +758,27 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
+  // 用于处理顺序,乱序不分离的情况
+  private void loadVldbDataDirNoSplit(File curFile, OperateFilePlan plan)
+      throws QueryProcessException {
+    File[] files = curFile.listFiles();
+    List<File> fileList = new ArrayList<>();
+    for (File file : files[1].listFiles()) {
+      if (file.getName().endsWith(TSFILE_SUFFIX)) {
+        fileList.add(file);
+      }
+    }
+    Collections.sort(fileList, new Comparator<File>() {
+      @Override
+      public int compare(File o1, File o2) {
+        return TsFileManagement.compareFileName(o1, o2);
+      }
+    });
+    for (File file : fileList) {
+      loadFileBySeq(file, false, plan);
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     new PlanExecutor().loadVldbDataDir(new File(
             "/Users/tianyu/2019秋季学期/iotdb/server/target/iotdb-server-0.11.2/data/data/sequence/root.group_1"),