You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/08/05 02:11:58 UTC

[GitHub] [incubator-iotdb] zhanglingzhe0820 opened a new pull request #1597: add tsfilemanage

zhanglingzhe0820 opened a new pull request #1597:
URL: https://github.com/apache/incubator-iotdb/pull/1597


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] qiaojialin merged pull request #1597: add tsfilemanage-merge

Posted by GitBox <gi...@apache.org>.
qiaojialin merged pull request #1597:
URL: https://github.com/apache/incubator-iotdb/pull/1597


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #1597: add tsfilemanage

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #1597:
URL: https://github.com/apache/incubator-iotdb/pull/1597#discussion_r468511549



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -1182,6 +1179,13 @@ public void syncCloseAllWorkingTsFileProcessors() {
                 (System.currentTimeMillis() - startTime) / 1000);
           }
         }
+        while (hotCompactionMergeWorking) {
+          closeStorageGroupCondition.wait(100);

Review comment:
       You should call closeStorageGroupCondition.notify() somewhere.

##########
File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
##########
@@ -1256,20 +1252,30 @@ public void setMergeFileStrategy(
     this.mergeFileStrategy = mergeFileStrategy;
   }
 
-  public boolean isEnableVm() {
-    return enableVm;
+
+  public TsFileManagementStrategy getTsFileManagementStrategy() {
+    return tsFileManagementStrategy;
+  }
+
+  public void setTsFileManagementStrategy(
+      TsFileManagementStrategy tsFileManagementStrategy) {
+    this.tsFileManagementStrategy = tsFileManagementStrategy;
+  }
+
+  public int getMaxFileNumInEachLevel() {
+    return maxFileNumInEachLevel;
   }
 
-  public void setEnableVm(boolean enableVm) {
-    this.enableVm = enableVm;
+  public void setMaxFileNumInEachLevel(int maxFileNumInEachLevel) {
+    this.maxFileNumInEachLevel = maxFileNumInEachLevel;

Review comment:
       It seems that you forgot to call it in IoTDBDescriptor

##########
File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
##########
@@ -588,9 +591,9 @@
   private int defaultFillInterval = -1;
 
   /**
-   * default TTL for storage groups that are not set TTL by statements, in ms
-   * Notice: if this property is changed, previous created storage group which are not set TTL
-   * will also be affected.
+   * default TTL for storage groups that are not set TTL by statements, in ms Notice: if this
+   * property is changed, previous created storage group which are not set TTL will also be
+   * affected.

Review comment:
       Change back

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.tsfilemanagement.level;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.HOT_COMPACTION_LOG_NAME;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.SOURCE_NAME;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.TARGET_NAME;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogAnalyzer;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionUtils;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LevelTsFileManagement extends TsFileManagement {
+
+  private static final Logger logger = LoggerFactory.getLogger(LevelTsFileManagement.class);
+  private int maxLevelNum = IoTDBDescriptor.getInstance().getConfig().getMaxLevelNum();
+  private final List<List<TsFileResource>> sequenceTsFileResources = new CopyOnWriteArrayList<>();
+  private final List<List<TsFileResource>> unSequenceTsFileResources = new CopyOnWriteArrayList<>();
+  private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>();
+  private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>();
+
+  public LevelTsFileManagement(String storageGroupName, String storageGroupDir) {
+    super(storageGroupName, storageGroupDir);
+    clear();
+  }
+
+  private void deleteLevelFiles(List<TsFileResource> vmMergeTsFiles) {
+    logger.debug("{} [hot compaction] merge starts to delete file", storageGroupName);
+    for (TsFileResource vmMergeTsFile : vmMergeTsFiles) {
+      deleteLevelFile(vmMergeTsFile);
+    }
+    for (int i = 0; i < maxLevelNum; i++) {
+      sequenceTsFileResources.get(i).removeAll(vmMergeTsFiles);
+      unSequenceTsFileResources.get(i).removeAll(vmMergeTsFiles);
+    }
+  }
+
+  private static void deleteLevelFile(TsFileResource seqFile) {
+    seqFile.writeLock();
+    try {
+      ChunkMetadataCache.getInstance().remove(seqFile);
+      FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
+      seqFile.setDeleted(true);
+      if (seqFile.getTsFile().exists()) {
+        Files.delete(seqFile.getTsFile().toPath());
+      }
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+    } finally {
+      seqFile.writeUnlock();
+    }
+  }
+
+  private void flushAllFilesToLastLevel(List<List<TsFileResource>> currMergeFiles,
+      HotCompactionLogger hotCompactionLogger, boolean sequence,
+      ReadWriteLock hotCompactionMergeLock) throws IOException {
+    TsFileResource sourceFile = currMergeFiles.get(0).get(0);
+    File newTargetFile = createNewTsFileName(sourceFile.getTsFile(), maxLevelNum - 1);
+    TsFileResource targetResource = new TsFileResource(newTargetFile);
+    HotCompactionUtils.merge(targetResource, getTsFileList(sequence),
+        storageGroupName, hotCompactionLogger, new HashSet<>(), sequence);
+    hotCompactionLogger.logFullMerge();
+    hotCompactionLogger.logSequence(sequence);
+    hotCompactionLogger.logFile(TARGET_NAME, newTargetFile);
+    hotCompactionMergeLock.writeLock().lock();
+    for (int i = 0; i < maxLevelNum - 1; i++) {
+      deleteLevelFiles(currMergeFiles.get(i));
+    }
+    hotCompactionMergeLock.writeLock().unlock();
+    hotCompactionLogger.logMergeFinish();
+  }
+
+  @Override
+  public List<TsFileResource> getMergeTsFileList(boolean sequence) {
+    if (sequence) {
+      return sequenceTsFileResources.get(maxLevelNum - 1);
+    } else {
+      return unSequenceTsFileResources.get(maxLevelNum - 1);
+    }
+  }
+
+  @Override
+  public List<TsFileResource> getTsFileList(boolean sequence) {
+    List<TsFileResource> result = new ArrayList<>();
+    if (sequence) {
+      for (int i = sequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result.addAll(sequenceTsFileResources.get(i));
+      }
+    } else {
+      for (int i = unSequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result.addAll(unSequenceTsFileResources.get(i));
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public Iterator<TsFileResource> getIterator(boolean sequence) {
+    return getTsFileList(sequence).iterator();
+  }
+
+  @Override
+  public void remove(TsFileResource tsFileResource, boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        sequenceTsFileResource.remove(tsFileResource);
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        unSequenceTsFileResource.remove(tsFileResource);
+      }
+    }
+  }
+
+  @Override
+  public void removeAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        sequenceTsFileResource.removeAll(tsFileResourceList);
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        unSequenceTsFileResource.removeAll(tsFileResourceList);
+      }
+    }
+  }
+
+  @Override
+  public void add(TsFileResource tsFileResource, boolean sequence) {
+    int level = getMergeLevel(tsFileResource.getTsFile());
+    if (level <= maxLevelNum - 1) {
+      if (sequence) {
+        sequenceTsFileResources.get(level).add(tsFileResource);
+      } else {
+        unSequenceTsFileResources.get(level).add(tsFileResource);
+      }
+    } else {
+      if (sequence) {
+        sequenceTsFileResources.get(maxLevelNum).add(tsFileResource);
+      } else {
+        unSequenceTsFileResources.get(maxLevelNum).add(tsFileResource);
+      }
+    }
+  }
+
+  @Override
+  public void addAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
+    for (TsFileResource tsFileResource : tsFileResourceList) {
+      add(tsFileResource, sequence);
+    }
+  }
+
+  @Override
+  public void addMerged(TsFileResource tsFileResource, boolean sequence) {
+    if (sequence) {
+      sequenceTsFileResources.get(maxLevelNum - 1).add(tsFileResource);
+    } else {
+      unSequenceTsFileResources.get(maxLevelNum - 1).add(tsFileResource);
+    }
+  }
+
+  @Override
+  public void addMergedAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
+    if (sequence) {
+      sequenceTsFileResources.get(maxLevelNum - 1).addAll(tsFileResourceList);
+    } else {
+      unSequenceTsFileResources.get(maxLevelNum - 1).addAll(tsFileResourceList);
+    }
+  }
+
+  @Override
+  public boolean contains(TsFileResource tsFileResource, boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        if (sequenceTsFileResource.contains(tsFileResource)) {
+          return true;
+        }
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        if (unSequenceTsFileResource.contains(tsFileResource)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void clear() {
+    sequenceTsFileResources.clear();
+    for (int i = 0; i < maxLevelNum + 1; i++) {
+      sequenceTsFileResources.add(new CopyOnWriteArrayList<>());
+    }
+    unSequenceTsFileResources.clear();
+    for (int i = 0; i < maxLevelNum + 1; i++) {
+      unSequenceTsFileResources.add(new CopyOnWriteArrayList<>());
+    }
+  }
+
+  @Override
+  public boolean isEmpty(boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        if (!sequenceTsFileResource.isEmpty()) {
+          return false;
+        }
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        if (!unSequenceTsFileResource.isEmpty()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public int size(boolean sequence) {
+    int result = 0;
+    if (sequence) {
+      for (int i = sequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result += sequenceTsFileResources.size();
+      }
+    } else {
+      for (int i = unSequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result += unSequenceTsFileResources.size();
+      }
+    }
+    return result;
+  }
+
+  /**
+   * recover files
+   */
+  @Override
+  public void recover() {
+    File logFile = FSFactoryProducer.getFSFactory()
+        .getFile(storageGroupDir, storageGroupName + HOT_COMPACTION_LOG_NAME);
+    try {
+      if (logFile.exists()) {
+        HotCompactionLogAnalyzer logAnalyzer = new HotCompactionLogAnalyzer(logFile);
+        logAnalyzer.analyze();
+        Set<String> deviceSet = logAnalyzer.getDeviceSet();
+        List<File> sourceFileList = logAnalyzer.getSourceFiles();
+        long offset = logAnalyzer.getOffset();
+        File targetFile = logAnalyzer.getTargetFile();
+        boolean isMergeFinished = logAnalyzer.isMergeFinished();
+        boolean fullMerge = logAnalyzer.isFullMerge();
+        boolean isSeq = logAnalyzer.isSeq();
+        if (targetFile == null) {
+          return;
+        }
+        if (fullMerge) {
+          if (!isMergeFinished) {
+            RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile);
+            writer.getIOWriterOut().truncate(offset - 1);
+            writer.close();
+            HotCompactionUtils
+                .merge(new TsFileResource(targetFile), getTsFileList(isSeq), storageGroupName,
+                    new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet, isSeq);
+            if (isSeq) {
+              for (List<TsFileResource> currMergeFile : sequenceTsFileResources) {
+                deleteLevelFiles(currMergeFile);
+              }
+            } else {
+              for (List<TsFileResource> currMergeFile : unSequenceTsFileResources) {
+                deleteLevelFiles(currMergeFile);
+              }
+            }
+          }
+        } else {
+          TsFileResource targetResource = new TsFileResource(targetFile);
+          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile);
+          if (sourceFileList.isEmpty()) {
+            return;
+          }
+          int level = getMergeLevel(sourceFileList.get(0));
+          if (!isMergeFinished) {
+            if (deviceSet.isEmpty()) {
+              Files.delete(targetFile.toPath());
+            } else {
+              writer.getIOWriterOut().truncate(offset - 1);
+              writer.close();
+              if (isSeq) {
+                HotCompactionUtils
+                    .merge(targetResource, sequenceTsFileResources.get(level),
+                        storageGroupName,
+                        new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet,
+                        true);
+                deleteLevelFiles(sequenceTsFileResources.get(level));
+                sequenceTsFileResources.get(level + 1).add(targetResource);
+              } else {
+                HotCompactionUtils
+                    .merge(targetResource, unSequenceTsFileResources.get(level),
+                        storageGroupName,
+                        new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet,
+                        false);
+                deleteLevelFiles(unSequenceTsFileResources.get(level));
+                unSequenceTsFileResources.get(level + 1).add(targetResource);
+              }
+            }
+          }
+        }
+      }
+    } catch (IOException e) {
+      logger.error("recover vm error ", e);
+    } finally {
+      if (logFile.exists()) {
+        try {
+          Files.delete(logFile.toPath());
+        } catch (IOException e) {
+          logger.error("delete vm log file error ", e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void forkCurrentFileList() {
+    forkTsFileList(forkedSequenceTsFileResources, sequenceTsFileResources);
+    forkTsFileList(forkedUnSequenceTsFileResources, unSequenceTsFileResources);
+  }
+
+  private void forkTsFileList(List<List<TsFileResource>> forkedSequenceTsFileResources,
+      List<List<TsFileResource>> sequenceTsFileResources) {
+    forkedSequenceTsFileResources.clear();
+    for (int i = 0; i < maxLevelNum - 1; i++) {

Review comment:
       sequenceTsFileResources' size is `maxLevelNum + 1`, so should the `maxLevelNum - 1` be `maxLevelNum`?

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.tsfilemanagement.level;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.HOT_COMPACTION_LOG_NAME;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.SOURCE_NAME;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.TARGET_NAME;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogAnalyzer;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionUtils;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LevelTsFileManagement extends TsFileManagement {
+
+  private static final Logger logger = LoggerFactory.getLogger(LevelTsFileManagement.class);
+  private int maxLevelNum = IoTDBDescriptor.getInstance().getConfig().getMaxLevelNum();
+  private final List<List<TsFileResource>> sequenceTsFileResources = new CopyOnWriteArrayList<>();
+  private final List<List<TsFileResource>> unSequenceTsFileResources = new CopyOnWriteArrayList<>();
+  private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>();
+  private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>();
+
+  public LevelTsFileManagement(String storageGroupName, String storageGroupDir) {
+    super(storageGroupName, storageGroupDir);
+    clear();
+  }
+
+  private void deleteLevelFiles(List<TsFileResource> vmMergeTsFiles) {
+    logger.debug("{} [hot compaction] merge starts to delete file", storageGroupName);
+    for (TsFileResource vmMergeTsFile : vmMergeTsFiles) {
+      deleteLevelFile(vmMergeTsFile);
+    }
+    for (int i = 0; i < maxLevelNum; i++) {
+      sequenceTsFileResources.get(i).removeAll(vmMergeTsFiles);
+      unSequenceTsFileResources.get(i).removeAll(vmMergeTsFiles);
+    }
+  }
+
+  private static void deleteLevelFile(TsFileResource seqFile) {
+    seqFile.writeLock();
+    try {
+      ChunkMetadataCache.getInstance().remove(seqFile);
+      FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
+      seqFile.setDeleted(true);
+      if (seqFile.getTsFile().exists()) {
+        Files.delete(seqFile.getTsFile().toPath());
+      }
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+    } finally {
+      seqFile.writeUnlock();
+    }
+  }
+
+  private void flushAllFilesToLastLevel(List<List<TsFileResource>> currMergeFiles,
+      HotCompactionLogger hotCompactionLogger, boolean sequence,
+      ReadWriteLock hotCompactionMergeLock) throws IOException {
+    TsFileResource sourceFile = currMergeFiles.get(0).get(0);
+    File newTargetFile = createNewTsFileName(sourceFile.getTsFile(), maxLevelNum - 1);
+    TsFileResource targetResource = new TsFileResource(newTargetFile);
+    HotCompactionUtils.merge(targetResource, getTsFileList(sequence),
+        storageGroupName, hotCompactionLogger, new HashSet<>(), sequence);
+    hotCompactionLogger.logFullMerge();
+    hotCompactionLogger.logSequence(sequence);
+    hotCompactionLogger.logFile(TARGET_NAME, newTargetFile);
+    hotCompactionMergeLock.writeLock().lock();
+    for (int i = 0; i < maxLevelNum - 1; i++) {
+      deleteLevelFiles(currMergeFiles.get(i));
+    }
+    hotCompactionMergeLock.writeLock().unlock();
+    hotCompactionLogger.logMergeFinish();
+  }
+
+  @Override
+  public List<TsFileResource> getMergeTsFileList(boolean sequence) {
+    if (sequence) {
+      return sequenceTsFileResources.get(maxLevelNum - 1);
+    } else {
+      return unSequenceTsFileResources.get(maxLevelNum - 1);
+    }
+  }
+
+  @Override
+  public List<TsFileResource> getTsFileList(boolean sequence) {
+    List<TsFileResource> result = new ArrayList<>();
+    if (sequence) {
+      for (int i = sequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result.addAll(sequenceTsFileResources.get(i));
+      }
+    } else {
+      for (int i = unSequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result.addAll(unSequenceTsFileResources.get(i));
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public Iterator<TsFileResource> getIterator(boolean sequence) {
+    return getTsFileList(sequence).iterator();
+  }
+
+  @Override
+  public void remove(TsFileResource tsFileResource, boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        sequenceTsFileResource.remove(tsFileResource);
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        unSequenceTsFileResource.remove(tsFileResource);
+      }
+    }
+  }
+
+  @Override
+  public void removeAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        sequenceTsFileResource.removeAll(tsFileResourceList);
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        unSequenceTsFileResource.removeAll(tsFileResourceList);
+      }
+    }
+  }
+
+  @Override
+  public void add(TsFileResource tsFileResource, boolean sequence) {
+    int level = getMergeLevel(tsFileResource.getTsFile());
+    if (level <= maxLevelNum - 1) {
+      if (sequence) {
+        sequenceTsFileResources.get(level).add(tsFileResource);
+      } else {
+        unSequenceTsFileResources.get(level).add(tsFileResource);
+      }
+    } else {
+      if (sequence) {
+        sequenceTsFileResources.get(maxLevelNum).add(tsFileResource);
+      } else {
+        unSequenceTsFileResources.get(maxLevelNum).add(tsFileResource);
+      }
+    }
+  }
+
+  @Override
+  public void addAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
+    for (TsFileResource tsFileResource : tsFileResourceList) {
+      add(tsFileResource, sequence);
+    }
+  }
+
+  @Override
+  public void addMerged(TsFileResource tsFileResource, boolean sequence) {
+    if (sequence) {
+      sequenceTsFileResources.get(maxLevelNum - 1).add(tsFileResource);
+    } else {
+      unSequenceTsFileResources.get(maxLevelNum - 1).add(tsFileResource);
+    }
+  }
+
+  @Override
+  public void addMergedAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
+    if (sequence) {
+      sequenceTsFileResources.get(maxLevelNum - 1).addAll(tsFileResourceList);
+    } else {
+      unSequenceTsFileResources.get(maxLevelNum - 1).addAll(tsFileResourceList);
+    }
+  }
+
+  @Override
+  public boolean contains(TsFileResource tsFileResource, boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        if (sequenceTsFileResource.contains(tsFileResource)) {
+          return true;
+        }
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        if (unSequenceTsFileResource.contains(tsFileResource)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void clear() {
+    sequenceTsFileResources.clear();
+    for (int i = 0; i < maxLevelNum + 1; i++) {
+      sequenceTsFileResources.add(new CopyOnWriteArrayList<>());
+    }
+    unSequenceTsFileResources.clear();
+    for (int i = 0; i < maxLevelNum + 1; i++) {
+      unSequenceTsFileResources.add(new CopyOnWriteArrayList<>());
+    }
+  }
+
+  @Override
+  public boolean isEmpty(boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        if (!sequenceTsFileResource.isEmpty()) {
+          return false;
+        }
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        if (!unSequenceTsFileResource.isEmpty()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public int size(boolean sequence) {
+    int result = 0;
+    if (sequence) {
+      for (int i = sequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result += sequenceTsFileResources.size();
+      }
+    } else {
+      for (int i = unSequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result += unSequenceTsFileResources.size();
+      }
+    }
+    return result;
+  }
+
+  /**
+   * recover files
+   */
+  @Override
+  public void recover() {
+    File logFile = FSFactoryProducer.getFSFactory()
+        .getFile(storageGroupDir, storageGroupName + HOT_COMPACTION_LOG_NAME);
+    try {
+      if (logFile.exists()) {
+        HotCompactionLogAnalyzer logAnalyzer = new HotCompactionLogAnalyzer(logFile);
+        logAnalyzer.analyze();
+        Set<String> deviceSet = logAnalyzer.getDeviceSet();
+        List<File> sourceFileList = logAnalyzer.getSourceFiles();
+        long offset = logAnalyzer.getOffset();
+        File targetFile = logAnalyzer.getTargetFile();
+        boolean isMergeFinished = logAnalyzer.isMergeFinished();
+        boolean fullMerge = logAnalyzer.isFullMerge();
+        boolean isSeq = logAnalyzer.isSeq();
+        if (targetFile == null) {
+          return;
+        }
+        if (fullMerge) {
+          if (!isMergeFinished) {
+            RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile);
+            writer.getIOWriterOut().truncate(offset - 1);
+            writer.close();
+            HotCompactionUtils
+                .merge(new TsFileResource(targetFile), getTsFileList(isSeq), storageGroupName,
+                    new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet, isSeq);
+            if (isSeq) {
+              for (List<TsFileResource> currMergeFile : sequenceTsFileResources) {
+                deleteLevelFiles(currMergeFile);
+              }
+            } else {
+              for (List<TsFileResource> currMergeFile : unSequenceTsFileResources) {
+                deleteLevelFiles(currMergeFile);
+              }
+            }
+          }
+        } else {
+          TsFileResource targetResource = new TsFileResource(targetFile);
+          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile);
+          if (sourceFileList.isEmpty()) {
+            return;
+          }
+          int level = getMergeLevel(sourceFileList.get(0));
+          if (!isMergeFinished) {
+            if (deviceSet.isEmpty()) {
+              Files.delete(targetFile.toPath());
+            } else {
+              writer.getIOWriterOut().truncate(offset - 1);
+              writer.close();
+              if (isSeq) {
+                HotCompactionUtils
+                    .merge(targetResource, sequenceTsFileResources.get(level),
+                        storageGroupName,
+                        new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet,
+                        true);
+                deleteLevelFiles(sequenceTsFileResources.get(level));
+                sequenceTsFileResources.get(level + 1).add(targetResource);
+              } else {
+                HotCompactionUtils
+                    .merge(targetResource, unSequenceTsFileResources.get(level),
+                        storageGroupName,
+                        new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet,
+                        false);
+                deleteLevelFiles(unSequenceTsFileResources.get(level));
+                unSequenceTsFileResources.get(level + 1).add(targetResource);
+              }
+            }
+          }
+        }
+      }
+    } catch (IOException e) {
+      logger.error("recover vm error ", e);
+    } finally {
+      if (logFile.exists()) {
+        try {
+          Files.delete(logFile.toPath());
+        } catch (IOException e) {
+          logger.error("delete vm log file error ", e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void forkCurrentFileList() {
+    forkTsFileList(forkedSequenceTsFileResources, sequenceTsFileResources);
+    forkTsFileList(forkedUnSequenceTsFileResources, unSequenceTsFileResources);
+  }
+
+  private void forkTsFileList(List<List<TsFileResource>> forkedSequenceTsFileResources,
+      List<List<TsFileResource>> sequenceTsFileResources) {

Review comment:
       change the parameter name. it can be either sequenceTsFileResources or unSequenceTsFileResources

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagement.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.tsfilemanagement;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseHotCompactionMergeCallBack;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+public abstract class TsFileManagement {
+
+  protected String storageGroupName;
+  protected String storageGroupDir;
+
+  public TsFileManagement(String storageGroupName, String storageGroupDir) {
+    this.storageGroupName = storageGroupName;
+    this.storageGroupDir = storageGroupDir;
+  }
+
+  public abstract List<TsFileResource> getMergeTsFileList(boolean sequence);
+
+  public abstract List<TsFileResource> getTsFileList(boolean sequence);
+
+  public abstract Iterator<TsFileResource> getIterator(boolean sequence);
+
+  public abstract void remove(TsFileResource tsFileResource, boolean sequence);
+
+  public abstract void removeAll(List<TsFileResource> tsFileResourceList, boolean sequence);
+
+  public abstract void add(TsFileResource tsFileResource, boolean sequence);
+
+  public abstract void addAll(List<TsFileResource> tsFileResourceList, boolean sequence);
+
+  public abstract void addMerged(TsFileResource tsFileResource, boolean sequence);
+
+  public abstract void addMergedAll(List<TsFileResource> tsFileResourceList, boolean sequence);
+
+  public abstract boolean contains(TsFileResource tsFileResource, boolean sequence);
+
+  public abstract void clear();
+
+  public abstract boolean isEmpty(boolean sequence);
+
+  public abstract int size(boolean sequence);
+
+  public abstract void recover();
+
+  public abstract void forkCurrentFileList();
+
+  protected abstract void merge(ReadWriteLock hotCompactionMergeLock);

Review comment:
       It's better to add some java doc for these abstract methods. It will be useful for someone else to implement another TsFileManagement.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.tsfilemanagement.level;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.HOT_COMPACTION_LOG_NAME;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.SOURCE_NAME;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.TARGET_NAME;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogAnalyzer;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionUtils;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LevelTsFileManagement extends TsFileManagement {
+
+  private static final Logger logger = LoggerFactory.getLogger(LevelTsFileManagement.class);
+  private int maxLevelNum = IoTDBDescriptor.getInstance().getConfig().getMaxLevelNum();
+  private final List<List<TsFileResource>> sequenceTsFileResources = new CopyOnWriteArrayList<>();
+  private final List<List<TsFileResource>> unSequenceTsFileResources = new CopyOnWriteArrayList<>();
+  private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>();
+  private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>();
+
+  public LevelTsFileManagement(String storageGroupName, String storageGroupDir) {
+    super(storageGroupName, storageGroupDir);
+    clear();
+  }
+
+  private void deleteLevelFiles(List<TsFileResource> vmMergeTsFiles) {
+    logger.debug("{} [hot compaction] merge starts to delete file", storageGroupName);
+    for (TsFileResource vmMergeTsFile : vmMergeTsFiles) {
+      deleteLevelFile(vmMergeTsFile);
+    }
+    for (int i = 0; i < maxLevelNum; i++) {
+      sequenceTsFileResources.get(i).removeAll(vmMergeTsFiles);
+      unSequenceTsFileResources.get(i).removeAll(vmMergeTsFiles);
+    }

Review comment:
       Why we need to iterate each level to delete vmMergeTsFiles? Cause, according to my understanding, they should be on the same level.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.tsfilemanagement.level;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.HOT_COMPACTION_LOG_NAME;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.SOURCE_NAME;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.TARGET_NAME;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogAnalyzer;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionUtils;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LevelTsFileManagement extends TsFileManagement {
+
+  private static final Logger logger = LoggerFactory.getLogger(LevelTsFileManagement.class);
+  private int maxLevelNum = IoTDBDescriptor.getInstance().getConfig().getMaxLevelNum();
+  private final List<List<TsFileResource>> sequenceTsFileResources = new CopyOnWriteArrayList<>();
+  private final List<List<TsFileResource>> unSequenceTsFileResources = new CopyOnWriteArrayList<>();
+  private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>();
+  private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>();
+
+  public LevelTsFileManagement(String storageGroupName, String storageGroupDir) {
+    super(storageGroupName, storageGroupDir);
+    clear();
+  }
+
+  private void deleteLevelFiles(List<TsFileResource> vmMergeTsFiles) {
+    logger.debug("{} [hot compaction] merge starts to delete file", storageGroupName);
+    for (TsFileResource vmMergeTsFile : vmMergeTsFiles) {
+      deleteLevelFile(vmMergeTsFile);
+    }
+    for (int i = 0; i < maxLevelNum; i++) {
+      sequenceTsFileResources.get(i).removeAll(vmMergeTsFiles);
+      unSequenceTsFileResources.get(i).removeAll(vmMergeTsFiles);
+    }
+  }
+
+  private static void deleteLevelFile(TsFileResource seqFile) {
+    seqFile.writeLock();
+    try {
+      ChunkMetadataCache.getInstance().remove(seqFile);
+      FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
+      seqFile.setDeleted(true);
+      if (seqFile.getTsFile().exists()) {
+        Files.delete(seqFile.getTsFile().toPath());
+      }
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+    } finally {
+      seqFile.writeUnlock();
+    }
+  }
+
+  private void flushAllFilesToLastLevel(List<List<TsFileResource>> currMergeFiles,
+      HotCompactionLogger hotCompactionLogger, boolean sequence,
+      ReadWriteLock hotCompactionMergeLock) throws IOException {
+    TsFileResource sourceFile = currMergeFiles.get(0).get(0);
+    File newTargetFile = createNewTsFileName(sourceFile.getTsFile(), maxLevelNum - 1);
+    TsFileResource targetResource = new TsFileResource(newTargetFile);
+    HotCompactionUtils.merge(targetResource, getTsFileList(sequence),
+        storageGroupName, hotCompactionLogger, new HashSet<>(), sequence);

Review comment:
       It seems that we should use currMergeFiles instead of `getTsFileList(sequence)`.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -1376,6 +1384,7 @@ public void delete(String deviceId, String measurementId, long startTime, long e
 
       // write log to impacted working TsFileProcessors
       logDeletion(startTime, endTime, deviceId, measurementId);
+      logDeletion(startTime, endTime, deviceId, measurementId);

Review comment:
       why we need to call logDeletion twice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] zhanglingzhe0820 commented on a change in pull request #1597: add tsfilemanage

Posted by GitBox <gi...@apache.org>.
zhanglingzhe0820 commented on a change in pull request #1597:
URL: https://github.com/apache/incubator-iotdb/pull/1597#discussion_r468562430



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.tsfilemanagement.level;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.HOT_COMPACTION_LOG_NAME;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.SOURCE_NAME;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.TARGET_NAME;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogAnalyzer;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionUtils;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LevelTsFileManagement extends TsFileManagement {
+
+  private static final Logger logger = LoggerFactory.getLogger(LevelTsFileManagement.class);
+  private int maxLevelNum = IoTDBDescriptor.getInstance().getConfig().getMaxLevelNum();
+  private final List<List<TsFileResource>> sequenceTsFileResources = new CopyOnWriteArrayList<>();
+  private final List<List<TsFileResource>> unSequenceTsFileResources = new CopyOnWriteArrayList<>();
+  private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>();
+  private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>();
+
+  public LevelTsFileManagement(String storageGroupName, String storageGroupDir) {
+    super(storageGroupName, storageGroupDir);
+    clear();
+  }
+
+  private void deleteLevelFiles(List<TsFileResource> vmMergeTsFiles) {
+    logger.debug("{} [hot compaction] merge starts to delete file", storageGroupName);
+    for (TsFileResource vmMergeTsFile : vmMergeTsFiles) {
+      deleteLevelFile(vmMergeTsFile);
+    }
+    for (int i = 0; i < maxLevelNum; i++) {
+      sequenceTsFileResources.get(i).removeAll(vmMergeTsFiles);
+      unSequenceTsFileResources.get(i).removeAll(vmMergeTsFiles);
+    }
+  }
+
+  private static void deleteLevelFile(TsFileResource seqFile) {
+    seqFile.writeLock();
+    try {
+      ChunkMetadataCache.getInstance().remove(seqFile);
+      FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
+      seqFile.setDeleted(true);
+      if (seqFile.getTsFile().exists()) {
+        Files.delete(seqFile.getTsFile().toPath());
+      }
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+    } finally {
+      seqFile.writeUnlock();
+    }
+  }
+
+  private void flushAllFilesToLastLevel(List<List<TsFileResource>> currMergeFiles,
+      HotCompactionLogger hotCompactionLogger, boolean sequence,
+      ReadWriteLock hotCompactionMergeLock) throws IOException {
+    TsFileResource sourceFile = currMergeFiles.get(0).get(0);
+    File newTargetFile = createNewTsFileName(sourceFile.getTsFile(), maxLevelNum - 1);
+    TsFileResource targetResource = new TsFileResource(newTargetFile);
+    HotCompactionUtils.merge(targetResource, getTsFileList(sequence),
+        storageGroupName, hotCompactionLogger, new HashSet<>(), sequence);
+    hotCompactionLogger.logFullMerge();
+    hotCompactionLogger.logSequence(sequence);
+    hotCompactionLogger.logFile(TARGET_NAME, newTargetFile);
+    hotCompactionMergeLock.writeLock().lock();
+    for (int i = 0; i < maxLevelNum - 1; i++) {
+      deleteLevelFiles(currMergeFiles.get(i));
+    }
+    hotCompactionMergeLock.writeLock().unlock();
+    hotCompactionLogger.logMergeFinish();
+  }
+
+  @Override
+  public List<TsFileResource> getMergeTsFileList(boolean sequence) {
+    if (sequence) {
+      return sequenceTsFileResources.get(maxLevelNum - 1);
+    } else {
+      return unSequenceTsFileResources.get(maxLevelNum - 1);
+    }
+  }
+
+  @Override
+  public List<TsFileResource> getTsFileList(boolean sequence) {
+    List<TsFileResource> result = new ArrayList<>();
+    if (sequence) {
+      for (int i = sequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result.addAll(sequenceTsFileResources.get(i));
+      }
+    } else {
+      for (int i = unSequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result.addAll(unSequenceTsFileResources.get(i));
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public Iterator<TsFileResource> getIterator(boolean sequence) {
+    return getTsFileList(sequence).iterator();
+  }
+
+  @Override
+  public void remove(TsFileResource tsFileResource, boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        sequenceTsFileResource.remove(tsFileResource);
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        unSequenceTsFileResource.remove(tsFileResource);
+      }
+    }
+  }
+
+  @Override
+  public void removeAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        sequenceTsFileResource.removeAll(tsFileResourceList);
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        unSequenceTsFileResource.removeAll(tsFileResourceList);
+      }
+    }
+  }
+
+  @Override
+  public void add(TsFileResource tsFileResource, boolean sequence) {
+    int level = getMergeLevel(tsFileResource.getTsFile());
+    if (level <= maxLevelNum - 1) {
+      if (sequence) {
+        sequenceTsFileResources.get(level).add(tsFileResource);
+      } else {
+        unSequenceTsFileResources.get(level).add(tsFileResource);
+      }
+    } else {
+      if (sequence) {
+        sequenceTsFileResources.get(maxLevelNum).add(tsFileResource);
+      } else {
+        unSequenceTsFileResources.get(maxLevelNum).add(tsFileResource);
+      }
+    }
+  }
+
+  @Override
+  public void addAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
+    for (TsFileResource tsFileResource : tsFileResourceList) {
+      add(tsFileResource, sequence);
+    }
+  }
+
+  @Override
+  public void addMerged(TsFileResource tsFileResource, boolean sequence) {
+    if (sequence) {
+      sequenceTsFileResources.get(maxLevelNum - 1).add(tsFileResource);
+    } else {
+      unSequenceTsFileResources.get(maxLevelNum - 1).add(tsFileResource);
+    }
+  }
+
+  @Override
+  public void addMergedAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
+    if (sequence) {
+      sequenceTsFileResources.get(maxLevelNum - 1).addAll(tsFileResourceList);
+    } else {
+      unSequenceTsFileResources.get(maxLevelNum - 1).addAll(tsFileResourceList);
+    }
+  }
+
+  @Override
+  public boolean contains(TsFileResource tsFileResource, boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        if (sequenceTsFileResource.contains(tsFileResource)) {
+          return true;
+        }
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        if (unSequenceTsFileResource.contains(tsFileResource)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void clear() {
+    sequenceTsFileResources.clear();
+    for (int i = 0; i < maxLevelNum + 1; i++) {
+      sequenceTsFileResources.add(new CopyOnWriteArrayList<>());
+    }
+    unSequenceTsFileResources.clear();
+    for (int i = 0; i < maxLevelNum + 1; i++) {
+      unSequenceTsFileResources.add(new CopyOnWriteArrayList<>());
+    }
+  }
+
+  @Override
+  public boolean isEmpty(boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        if (!sequenceTsFileResource.isEmpty()) {
+          return false;
+        }
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        if (!unSequenceTsFileResource.isEmpty()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public int size(boolean sequence) {
+    int result = 0;
+    if (sequence) {
+      for (int i = sequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result += sequenceTsFileResources.size();
+      }
+    } else {
+      for (int i = unSequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result += unSequenceTsFileResources.size();
+      }
+    }
+    return result;
+  }
+
+  /**
+   * recover files
+   */
+  @Override
+  public void recover() {
+    File logFile = FSFactoryProducer.getFSFactory()
+        .getFile(storageGroupDir, storageGroupName + HOT_COMPACTION_LOG_NAME);
+    try {
+      if (logFile.exists()) {
+        HotCompactionLogAnalyzer logAnalyzer = new HotCompactionLogAnalyzer(logFile);
+        logAnalyzer.analyze();
+        Set<String> deviceSet = logAnalyzer.getDeviceSet();
+        List<File> sourceFileList = logAnalyzer.getSourceFiles();
+        long offset = logAnalyzer.getOffset();
+        File targetFile = logAnalyzer.getTargetFile();
+        boolean isMergeFinished = logAnalyzer.isMergeFinished();
+        boolean fullMerge = logAnalyzer.isFullMerge();
+        boolean isSeq = logAnalyzer.isSeq();
+        if (targetFile == null) {
+          return;
+        }
+        if (fullMerge) {
+          if (!isMergeFinished) {
+            RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile);
+            writer.getIOWriterOut().truncate(offset - 1);
+            writer.close();
+            HotCompactionUtils
+                .merge(new TsFileResource(targetFile), getTsFileList(isSeq), storageGroupName,
+                    new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet, isSeq);
+            if (isSeq) {
+              for (List<TsFileResource> currMergeFile : sequenceTsFileResources) {
+                deleteLevelFiles(currMergeFile);
+              }
+            } else {
+              for (List<TsFileResource> currMergeFile : unSequenceTsFileResources) {
+                deleteLevelFiles(currMergeFile);
+              }
+            }
+          }
+        } else {
+          TsFileResource targetResource = new TsFileResource(targetFile);
+          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile);
+          if (sourceFileList.isEmpty()) {
+            return;
+          }
+          int level = getMergeLevel(sourceFileList.get(0));
+          if (!isMergeFinished) {
+            if (deviceSet.isEmpty()) {
+              Files.delete(targetFile.toPath());
+            } else {
+              writer.getIOWriterOut().truncate(offset - 1);
+              writer.close();
+              if (isSeq) {
+                HotCompactionUtils
+                    .merge(targetResource, sequenceTsFileResources.get(level),
+                        storageGroupName,
+                        new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet,
+                        true);
+                deleteLevelFiles(sequenceTsFileResources.get(level));
+                sequenceTsFileResources.get(level + 1).add(targetResource);
+              } else {
+                HotCompactionUtils
+                    .merge(targetResource, unSequenceTsFileResources.get(level),
+                        storageGroupName,
+                        new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet,
+                        false);
+                deleteLevelFiles(unSequenceTsFileResources.get(level));
+                unSequenceTsFileResources.get(level + 1).add(targetResource);
+              }
+            }
+          }
+        }
+      }
+    } catch (IOException e) {
+      logger.error("recover vm error ", e);
+    } finally {
+      if (logFile.exists()) {
+        try {
+          Files.delete(logFile.toPath());
+        } catch (IOException e) {
+          logger.error("delete vm log file error ", e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void forkCurrentFileList() {
+    forkTsFileList(forkedSequenceTsFileResources, sequenceTsFileResources);
+    forkTsFileList(forkedUnSequenceTsFileResources, unSequenceTsFileResources);
+  }
+
+  private void forkTsFileList(List<List<TsFileResource>> forkedSequenceTsFileResources,
+      List<List<TsFileResource>> sequenceTsFileResources) {
+    forkedSequenceTsFileResources.clear();
+    for (int i = 0; i < maxLevelNum - 1; i++) {

Review comment:
       no, the maxLevelNum - 1 is the last level of hot compaction, and the maxLevelNum is for the all merge completed level




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org