You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/05/24 06:11:04 UTC

[incubator-iotdb] branch refactor_overflow updated: update merge

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch refactor_overflow
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/refactor_overflow by this push:
     new ffedb93  update merge
ffedb93 is described below

commit ffedb9368ed1bc39f3140e42d0fad5879d088ac2
Author: 江天 <jt...@163.com>
AuthorDate: Fri May 24 14:09:15 2019 +0800

    update merge
---
 .../db/engine/sgmanager/StorageGroupProcessor.java | 154 +++++++++++++++++++--
 .../db/engine/tsfiledata/TsFileProcessor.java      |  46 +++++-
 2 files changed, 185 insertions(+), 15 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
index 77fc48d..c658b8e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
@@ -21,8 +21,13 @@ package org.apache.iotdb.db.engine.sgmanager;
 
 import static java.time.ZonedDateTime.ofInstant;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -44,6 +49,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.Processor;
 import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
 import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.merge.MergeTask;
 import org.apache.iotdb.db.engine.modification.Deletion;
@@ -92,7 +98,12 @@ import org.slf4j.LoggerFactory;
 public class StorageGroupProcessor extends Processor implements IStatistic {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(StorageGroupProcessor.class);
+
+  public static final String MERGE_TEMP_SUFFIX = ".merging";
+  private static final String OLD_FILE_RECORD = "MERGE_OLD_FILES";
+
   private final String statStorageGroupName;
+  private String fileNodeDir;
 
   private TsFileProcessor tsFileProcessor;
   private OverflowProcessor overflowProcessor;
@@ -148,6 +159,13 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
       LOGGER.info("The directory of the filenode processor {} doesn't exist. Create new directory {}",
           getProcessorName(), systemFolder.getAbsolutePath());
     }
+    fileNodeDir = systemFolder.getAbsolutePath();
+    try {
+      cleanLastMerge();
+    } catch (IOException e) {
+      throw new TsFileProcessorException(e);
+    }
+
     //the version controller is shared by tsfile and overflow processor.
     try {
       versionController = new SimpleFileVersionController(systemFolder.getAbsolutePath());
@@ -582,10 +600,8 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
         lastUnseqIndex = currUnseqGroup.size();
       }
 
-      if (currUnseqGroup.size() > 0) {
-        seqGroups.add(currSeqGroup);
-        unseqGroups.add(currUnseqGroup);
-      }
+      seqGroups.add(currSeqGroup);
+      unseqGroups.add(currUnseqGroup);
     }
     return new Pair<>(seqGroups, unseqGroups);
   }
@@ -626,6 +642,8 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
   public void merge() throws StorageGroupProcessorException {
     writeLock();
     Pair<List<List<TsFileResource>>, List<List<TsFileResource>>> fileGroups;
+    List<TsFileResource> unseqFiles;
+    List<TsFileResource> seqFiles;
     try {
       // close seqFile and overflow(unseqFile), prepare for merge
       LOGGER.info("The StorageGroupProcessor {} begins to merge.", processorName);
@@ -634,8 +652,8 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
       isMerging = true;
 
       // find what files are to be processed in this merge
-      List<TsFileResource> seqFiles = tsFileProcessor.getTsFileResources();
-      List<TsFileResource> unseqFiles = overflowProcessor.getTsFileResources();
+      seqFiles = new ArrayList<>(tsFileProcessor.getTsFileResources());
+      unseqFiles = new ArrayList<>(overflowProcessor.getTsFileResources());
 
       fileGroups = groupMergeFiles(seqFiles, unseqFiles);
     } catch (TsFileProcessorException e) {
@@ -650,6 +668,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
 
     LOGGER.info("{}: There are {} groups to be merged", processorName, seqGroups.size());
 
+
     for (int i = 0; i < seqGroups.size(); i++) {
       List<TsFileResource> seqGroup = seqGroups.get(i);
       List<TsFileResource> unseqGroup = unseqGroups.get(i);
@@ -661,21 +680,97 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
       LOGGER.info("{}: Starting merging the {} group, there are {} seq groups and {} unseq groups",
           processorName, i, seqFileSize, unseqFileSize);
       TsFileResource newResource = mergeFileGroup(seqGroup, unseqGroup);
+      try {
+        addMergedFile(newResource, seqGroup, unseqGroup);
+      } catch (IOException e) {
+        LOGGER.error("Cannot merge the {} group because:", e);
+        continue;
+      }
       long endTime = System.currentTimeMillis();
+      LOGGER.info("{}: End merging the {} group after {}ms", processorName, i, endTime - startTime);
     }
+  }
 
+  private void waitForQueriesOnOldFile() {
+    long sleepTime = 1000;
+    long maxSleepTime = 60 * 1000L;
+    while (!oldMultiPassTokenSet.isEmpty()) {
+      try {
+        Thread.sleep(sleepTime);
+        sleepTime = sleepTime * 2 > maxSleepTime ? maxSleepTime : sleepTime * 2;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("{}: unexpected interuption when waiting for queries on old files",
+            processorName, e);
+      }
+    }
+  }
 
-    // change status from merge to wait
-    switchMergeToWaiting(backupIntervalFiles, needEmtpy);
+  private void addMergedFile(TsFileResource mergedFile, List<TsFileResource> seqGroup,
+      List<TsFileResource> unseqGroup) throws IOException {
+    writeLock();
+    try {
+      // record merge old files so that they can be deleted after system crush
+      recordOldFiles(mergedFile, seqGroup, unseqGroup);
+      // update file lists of seqProcessor and unseqProcessor
+      tsFileProcessor.replaceFiles(seqGroup, mergedFile);
+      overflowProcessor.replaceFiles(unseqGroup, null);
+      // rename the new file so that it will not be deleted after system crush
+      String newName = mergedFile.getFile().getName().replace(MERGE_TEMP_SUFFIX, "");
+      File newFile = new File(mergedFile.getFile().getParent(), newName);
+      //noinspection ResultOfMethodCallIgnored
+      newFile.delete();
+      if (!mergedFile.getFile().renameTo(newFile)) {
+        throw new IOException(String.format("Cannot rename merged file %s",
+            mergedFile.getFilePath()));
+      }
+      // now new queries will no longer use old files, so record new queries in another token set
+      oldMultiPassTokenSet = newMultiPassTokenSet;
+      newMultiPassTokenSet = new HashSet<>();
+    } finally {
+      writeUnlock();
+    }
+    // wait until all old queries end
+    waitForQueriesOnOldFile();
+    // remove old files
+    for (TsFileResource oldFile : seqGroup) {
+      Files.deleteIfExists(oldFile.getFile().toPath());
+    }
+    for (TsFileResource oldFile : unseqGroup) {
+      Files.deleteIfExists(oldFile.getFile().toPath());
+    }
+    // remove old file records
+    Files.deleteIfExists(getMergeFileRecord().toPath());
+  }
+
+  private File getMergeFileRecord() {
+    return new File(fileNodeDir, OLD_FILE_RECORD);
+  }
 
-    // change status from wait to work
-    switchWaitingToWorking();
+  private void recordOldFiles(TsFileResource newFile,
+      List<TsFileResource> seqGroup, List<TsFileResource> unseqGroup)
+      throws IOException {
+    File oldFileRecord = getMergeFileRecord();
+    try (BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(oldFileRecord, true))) {
+      for (TsFileResource resource : seqGroup) {
+        bufferedWriter.write(resource.getFilePath());
+        bufferedWriter.write("\n");
+      }
+      for (TsFileResource resource : unseqGroup) {
+        bufferedWriter.write(resource.getFilePath());
+        bufferedWriter.write("\n");
+      }
+      bufferedWriter.write(newFile.getFilePath());
+    }
   }
 
   private TsFileResource mergeFileGroup(List<TsFileResource> seqGroup, List<TsFileResource> unseqGroup)
       throws StorageGroupProcessorException {
+    if (unseqGroup.isEmpty()) {
+      return seqGroup.get(0);
+    }
     // find all timeseries (possibly) of each device (the key) contained in these files.
-    Map<String, List<Path>> pathMap = null;
+    Map<String, List<Path>> pathMap;
     try {
       pathMap = resolveMergePaths(seqGroup, unseqGroup);
     } catch (PathErrorException e) {
@@ -686,7 +781,8 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
 
     mergeBaseDir = Directories.getInstance().getNextFolderForTsfile();
     mergeFileName = minimumTime
-        + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis();
+        + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis()
+        + MERGE_TEMP_SUFFIX;
     mergeOutputPath = constructOutputFilePath(mergeBaseDir, getProcessorName(),
         mergeFileName);
     mergeFileName = getProcessorName() + File.separatorChar + mergeFileName;
@@ -923,4 +1019,38 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
     }
   }
 
+  @SuppressWarnings("ResultOfMethodCallIgnored")
+  private void cleanLastMerge() throws IOException {
+    // if system crushes during last merge, there may be undeleted old files
+    // these files will be deleted in this method
+    File mergeRecord = getMergeFileRecord();
+    List<String> filePaths = new ArrayList<>();
+    try (BufferedReader bufferedReader = new BufferedReader(new FileReader(mergeRecord))) {
+      bufferedReader.lines().forEach(line -> filePaths.add(line));
+    }
+    // the last file should be the merged new file
+    String lastFilePath = filePaths.get(filePaths.size() - 1);
+    if (!lastFilePath.endsWith(MERGE_TEMP_SUFFIX)) {
+      // the record is incomplete, consider last merge failed, do not delete any file
+      LOGGER.warn("The merge record is not complete, the last merge seems failed");
+      mergeRecord.delete();
+      return;
+    }
+    File mergedTempFile = new File(lastFilePath);
+    if (mergedTempFile.exists()) {
+      // the temp file has not been renamed, rename it
+      File newName = new File(mergedTempFile.getParent(),
+          mergedTempFile.getName().replace(MERGE_TEMP_SUFFIX, ""));
+      if (!mergedTempFile.renameTo(newName)) {
+        LOGGER.error("Cannot rename merged temp file {}, abort last merge",
+            mergedTempFile.getAbsolutePath());
+        mergeRecord.delete();
+        return;
+      }
+    }
+    // the merged temp file has been renamed, the old files can be safely deleted
+    for (String filePath : filePaths) {
+      new File(filePath).delete();
+    }
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index a06cd76..d822edb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
 import org.apache.iotdb.db.engine.sgmanager.OperationResult;
+import org.apache.iotdb.db.engine.sgmanager.StorageGroupProcessor;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.FileNodeProcessorException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
@@ -189,7 +190,6 @@ public class TsFileProcessor extends Processor {
   @SuppressWarnings({"ResultOfMethodCallIgnored"})
   private void initResources() throws TsFileProcessorException {
     tsFileResources = new ArrayList<>();
-    inverseIndexOfResource = new HashMap<>();
     lastFlushedTimeForEachDevice = new HashMap<>();
     minWrittenTimeForEachDeviceInCurrentFile = new HashMap<>();
     maxWrittenTimeForEachDeviceInCurrentFile = new HashMap<>();
@@ -227,7 +227,7 @@ public class TsFileProcessor extends Processor {
     if (unclosedFile == null) {
       unclosedFile = generateNewTsFilePath();
     }
-
+    buildInverseIndex();
     initCurrentTsFile(unclosedFile);
   }
 
@@ -244,6 +244,12 @@ public class TsFileProcessor extends Processor {
         .parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
 
     for (File tsfile : tsFiles) {
+      if (tsfile.getName().endsWith(StorageGroupProcessor.MERGE_TEMP_SUFFIX)) {
+        // remove temp file of last failed merge
+        //noinspection ResultOfMethodCallIgnored
+        tsfile.delete();
+        continue;
+      }
       if (!tsfile.getName().equals(unclosedFileName)) {
         addResource(tsfile);
       }
@@ -267,12 +273,20 @@ public class TsFileProcessor extends Processor {
     tsFileResources.add(resource);
     //maintain the inverse index and fileNamePrefix
     for (String device : resource.getDevices()) {
-      inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>()).add(resource);
       lastFlushedTimeForEachDevice
           .merge(device, resource.getEndTime(device), (x, y) -> x > y ? x : y);
     }
   }
 
+  public void buildInverseIndex() {
+    inverseIndexOfResource.clear();
+    for (TsFileResource resource : tsFileResources) {
+      for (String device : resource.getDevices()) {
+        inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>()).add(resource);
+      }
+    }
+  }
+
 
   private File generateNewTsFilePath() throws TsFileProcessorException {
     String dataDir = getNextDataFolder();
@@ -950,4 +964,30 @@ public class TsFileProcessor extends Processor {
   public List<TsFileResource> getTsFileResources() {
     return tsFileResources;
   }
+
+  /**
+   * Merge method, replaces 'oldFiles' in tsfileResources with 'newFile'. If 'newFile' is null, just
+   * remove oldFiles.
+   * @param oldFiles
+   * @param newFile
+   */
+  public void replaceFiles(List<TsFileResource> oldFiles, TsFileResource newFile) {
+    List<TsFileResource> newFiles = new ArrayList<>();
+    int j = 0;
+    for (TsFileResource origin : tsFileResources) {
+      TsFileResource toDelete = j < oldFiles.size() ? oldFiles.get(j) : null;
+      if (origin == toDelete) {
+        if (j == 0 && newFile != null) {
+          // replace the first old file with the new file
+          newFiles.add(newFile);
+        }
+        // this file should be deleted, do not add it to new files
+        j++;
+      } else {
+        // this file is not one of the new files, keep it
+        newFiles.add(origin);
+      }
+    }
+    tsFileResources = newFiles;
+  }
 }
\ No newline at end of file