You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/08/11 13:04:03 UTC

[GitHub] [incubator-iotdb] zhanglingzhe0820 commented on a change in pull request #1597: add tsfilemanage

zhanglingzhe0820 commented on a change in pull request #1597:
URL: https://github.com/apache/incubator-iotdb/pull/1597#discussion_r468562430



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.tsfilemanagement.level;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.HOT_COMPACTION_LOG_NAME;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.SOURCE_NAME;
+import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.TARGET_NAME;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogAnalyzer;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger;
+import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionUtils;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LevelTsFileManagement extends TsFileManagement {
+
+  private static final Logger logger = LoggerFactory.getLogger(LevelTsFileManagement.class);
+  private int maxLevelNum = IoTDBDescriptor.getInstance().getConfig().getMaxLevelNum();
+  private final List<List<TsFileResource>> sequenceTsFileResources = new CopyOnWriteArrayList<>();
+  private final List<List<TsFileResource>> unSequenceTsFileResources = new CopyOnWriteArrayList<>();
+  private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>();
+  private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>();
+
+  public LevelTsFileManagement(String storageGroupName, String storageGroupDir) {
+    super(storageGroupName, storageGroupDir);
+    clear();
+  }
+
+  private void deleteLevelFiles(List<TsFileResource> vmMergeTsFiles) {
+    logger.debug("{} [hot compaction] merge starts to delete file", storageGroupName);
+    for (TsFileResource vmMergeTsFile : vmMergeTsFiles) {
+      deleteLevelFile(vmMergeTsFile);
+    }
+    for (int i = 0; i < maxLevelNum; i++) {
+      sequenceTsFileResources.get(i).removeAll(vmMergeTsFiles);
+      unSequenceTsFileResources.get(i).removeAll(vmMergeTsFiles);
+    }
+  }
+
+  private static void deleteLevelFile(TsFileResource seqFile) {
+    seqFile.writeLock();
+    try {
+      ChunkMetadataCache.getInstance().remove(seqFile);
+      FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
+      seqFile.setDeleted(true);
+      if (seqFile.getTsFile().exists()) {
+        Files.delete(seqFile.getTsFile().toPath());
+      }
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+    } finally {
+      seqFile.writeUnlock();
+    }
+  }
+
+  private void flushAllFilesToLastLevel(List<List<TsFileResource>> currMergeFiles,
+      HotCompactionLogger hotCompactionLogger, boolean sequence,
+      ReadWriteLock hotCompactionMergeLock) throws IOException {
+    TsFileResource sourceFile = currMergeFiles.get(0).get(0);
+    File newTargetFile = createNewTsFileName(sourceFile.getTsFile(), maxLevelNum - 1);
+    TsFileResource targetResource = new TsFileResource(newTargetFile);
+    HotCompactionUtils.merge(targetResource, getTsFileList(sequence),
+        storageGroupName, hotCompactionLogger, new HashSet<>(), sequence);
+    hotCompactionLogger.logFullMerge();
+    hotCompactionLogger.logSequence(sequence);
+    hotCompactionLogger.logFile(TARGET_NAME, newTargetFile);
+    hotCompactionMergeLock.writeLock().lock();
+    for (int i = 0; i < maxLevelNum - 1; i++) {
+      deleteLevelFiles(currMergeFiles.get(i));
+    }
+    hotCompactionMergeLock.writeLock().unlock();
+    hotCompactionLogger.logMergeFinish();
+  }
+
+  @Override
+  public List<TsFileResource> getMergeTsFileList(boolean sequence) {
+    if (sequence) {
+      return sequenceTsFileResources.get(maxLevelNum - 1);
+    } else {
+      return unSequenceTsFileResources.get(maxLevelNum - 1);
+    }
+  }
+
+  @Override
+  public List<TsFileResource> getTsFileList(boolean sequence) {
+    List<TsFileResource> result = new ArrayList<>();
+    if (sequence) {
+      for (int i = sequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result.addAll(sequenceTsFileResources.get(i));
+      }
+    } else {
+      for (int i = unSequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result.addAll(unSequenceTsFileResources.get(i));
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public Iterator<TsFileResource> getIterator(boolean sequence) {
+    return getTsFileList(sequence).iterator();
+  }
+
+  @Override
+  public void remove(TsFileResource tsFileResource, boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        sequenceTsFileResource.remove(tsFileResource);
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        unSequenceTsFileResource.remove(tsFileResource);
+      }
+    }
+  }
+
+  @Override
+  public void removeAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        sequenceTsFileResource.removeAll(tsFileResourceList);
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        unSequenceTsFileResource.removeAll(tsFileResourceList);
+      }
+    }
+  }
+
+  @Override
+  public void add(TsFileResource tsFileResource, boolean sequence) {
+    int level = getMergeLevel(tsFileResource.getTsFile());
+    if (level <= maxLevelNum - 1) {
+      if (sequence) {
+        sequenceTsFileResources.get(level).add(tsFileResource);
+      } else {
+        unSequenceTsFileResources.get(level).add(tsFileResource);
+      }
+    } else {
+      if (sequence) {
+        sequenceTsFileResources.get(maxLevelNum).add(tsFileResource);
+      } else {
+        unSequenceTsFileResources.get(maxLevelNum).add(tsFileResource);
+      }
+    }
+  }
+
+  @Override
+  public void addAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
+    for (TsFileResource tsFileResource : tsFileResourceList) {
+      add(tsFileResource, sequence);
+    }
+  }
+
+  @Override
+  public void addMerged(TsFileResource tsFileResource, boolean sequence) {
+    if (sequence) {
+      sequenceTsFileResources.get(maxLevelNum - 1).add(tsFileResource);
+    } else {
+      unSequenceTsFileResources.get(maxLevelNum - 1).add(tsFileResource);
+    }
+  }
+
+  @Override
+  public void addMergedAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
+    if (sequence) {
+      sequenceTsFileResources.get(maxLevelNum - 1).addAll(tsFileResourceList);
+    } else {
+      unSequenceTsFileResources.get(maxLevelNum - 1).addAll(tsFileResourceList);
+    }
+  }
+
+  @Override
+  public boolean contains(TsFileResource tsFileResource, boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        if (sequenceTsFileResource.contains(tsFileResource)) {
+          return true;
+        }
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        if (unSequenceTsFileResource.contains(tsFileResource)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void clear() {
+    sequenceTsFileResources.clear();
+    for (int i = 0; i < maxLevelNum + 1; i++) {
+      sequenceTsFileResources.add(new CopyOnWriteArrayList<>());
+    }
+    unSequenceTsFileResources.clear();
+    for (int i = 0; i < maxLevelNum + 1; i++) {
+      unSequenceTsFileResources.add(new CopyOnWriteArrayList<>());
+    }
+  }
+
+  @Override
+  public boolean isEmpty(boolean sequence) {
+    if (sequence) {
+      for (List<TsFileResource> sequenceTsFileResource : sequenceTsFileResources) {
+        if (!sequenceTsFileResource.isEmpty()) {
+          return false;
+        }
+      }
+    } else {
+      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources) {
+        if (!unSequenceTsFileResource.isEmpty()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public int size(boolean sequence) {
+    int result = 0;
+    if (sequence) {
+      for (int i = sequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result += sequenceTsFileResources.size();
+      }
+    } else {
+      for (int i = unSequenceTsFileResources.size() - 1; i >= 0; i--) {
+        result += unSequenceTsFileResources.size();
+      }
+    }
+    return result;
+  }
+
+  /**
+   * recover files
+   */
+  @Override
+  public void recover() {
+    File logFile = FSFactoryProducer.getFSFactory()
+        .getFile(storageGroupDir, storageGroupName + HOT_COMPACTION_LOG_NAME);
+    try {
+      if (logFile.exists()) {
+        HotCompactionLogAnalyzer logAnalyzer = new HotCompactionLogAnalyzer(logFile);
+        logAnalyzer.analyze();
+        Set<String> deviceSet = logAnalyzer.getDeviceSet();
+        List<File> sourceFileList = logAnalyzer.getSourceFiles();
+        long offset = logAnalyzer.getOffset();
+        File targetFile = logAnalyzer.getTargetFile();
+        boolean isMergeFinished = logAnalyzer.isMergeFinished();
+        boolean fullMerge = logAnalyzer.isFullMerge();
+        boolean isSeq = logAnalyzer.isSeq();
+        if (targetFile == null) {
+          return;
+        }
+        if (fullMerge) {
+          if (!isMergeFinished) {
+            RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile);
+            writer.getIOWriterOut().truncate(offset - 1);
+            writer.close();
+            HotCompactionUtils
+                .merge(new TsFileResource(targetFile), getTsFileList(isSeq), storageGroupName,
+                    new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet, isSeq);
+            if (isSeq) {
+              for (List<TsFileResource> currMergeFile : sequenceTsFileResources) {
+                deleteLevelFiles(currMergeFile);
+              }
+            } else {
+              for (List<TsFileResource> currMergeFile : unSequenceTsFileResources) {
+                deleteLevelFiles(currMergeFile);
+              }
+            }
+          }
+        } else {
+          TsFileResource targetResource = new TsFileResource(targetFile);
+          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile);
+          if (sourceFileList.isEmpty()) {
+            return;
+          }
+          int level = getMergeLevel(sourceFileList.get(0));
+          if (!isMergeFinished) {
+            if (deviceSet.isEmpty()) {
+              Files.delete(targetFile.toPath());
+            } else {
+              writer.getIOWriterOut().truncate(offset - 1);
+              writer.close();
+              if (isSeq) {
+                HotCompactionUtils
+                    .merge(targetResource, sequenceTsFileResources.get(level),
+                        storageGroupName,
+                        new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet,
+                        true);
+                deleteLevelFiles(sequenceTsFileResources.get(level));
+                sequenceTsFileResources.get(level + 1).add(targetResource);
+              } else {
+                HotCompactionUtils
+                    .merge(targetResource, unSequenceTsFileResources.get(level),
+                        storageGroupName,
+                        new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet,
+                        false);
+                deleteLevelFiles(unSequenceTsFileResources.get(level));
+                unSequenceTsFileResources.get(level + 1).add(targetResource);
+              }
+            }
+          }
+        }
+      }
+    } catch (IOException e) {
+      logger.error("recover vm error ", e);
+    } finally {
+      if (logFile.exists()) {
+        try {
+          Files.delete(logFile.toPath());
+        } catch (IOException e) {
+          logger.error("delete vm log file error ", e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void forkCurrentFileList() {
+    forkTsFileList(forkedSequenceTsFileResources, sequenceTsFileResources);
+    forkTsFileList(forkedUnSequenceTsFileResources, unSequenceTsFileResources);
+  }
+
+  private void forkTsFileList(List<List<TsFileResource>> forkedSequenceTsFileResources,
+      List<List<TsFileResource>> sequenceTsFileResources) {
+    forkedSequenceTsFileResources.clear();
+    for (int i = 0; i < maxLevelNum - 1; i++) {

Review comment:
       no, the maxLevelNum - 1 is the last level of hot compaction, and the maxLevelNum is for the all merge completed level




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org