You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/03/23 09:34:42 UTC

[incubator-iotdb] branch partitioned_file_version_management created (now 6758027)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch partitioned_file_version_management
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 6758027  integrate data partition with file version management

This branch includes the following new commits:

     new 6758027  integrate data partition with file version management

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: integrate data partition with file version management

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch partitioned_file_version_management
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 67580273f8b9a881aed441f2151b5fc5c6e95013
Author: jt2594838 <jt...@163.com>
AuthorDate: Mon Mar 23 17:34:29 2020 +0800

    integrate data partition with file version management
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |   6 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 171 +++++++++++++--------
 2 files changed, 110 insertions(+), 67 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index c63f728..a838a54 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -549,10 +549,10 @@ public class StorageEngine implements IService {
     this.fileFlushPolicy = fileFlushPolicy;
   }
 
-  public boolean isFileAlreadyExist(TsFileResource tsFileResource, String storageGroup) {
-    // TODO-Cluster#350: integrate with time partitioning
+  public boolean isFileAlreadyExist(TsFileResource tsFileResource, String storageGroup,
+      long partitionNum) {
     StorageGroupProcessor processor = processorMap.get(storageGroup);
-    return processor != null && processor.isFileAlreadyExist(tsFileResource);
+    return processor != null && processor.isFileAlreadyExist(tsFileResource, partitionNum);
   }
 
   public static long getTimePartitionInterval() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 295a074..c490723 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,6 +18,28 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -40,7 +62,11 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -53,6 +79,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
+import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -73,17 +100,6 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
 
 /**
  * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
@@ -203,9 +219,16 @@ public class StorageGroupProcessor {
   private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
   private TsFileFlushPolicy fileFlushPolicy;
 
-  // allDirectFileVersions records the versions of the direct TsFiles (generated by flush), not
-  // including the files generated by merge
-  private Set<Long> allDirectFileVersions = new HashSet<>();
+  /**
+   * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close,
+   * not including the files generated by merge) of each partition.
+   * As data file close is managed by the leader in the distributed version, the files with the
+   * same version(s) have the same data, despite that the inner structure (the size and
+   * organization of chunks) may be different, so we can easily find what remote files we do not
+   * have locally.
+   * partition number -> version number set
+   */
+  private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
 
   public StorageGroupProcessor(String systemInfoDir, String storageGroupName,
       TsFileFlushPolicy fileFlushPolicy)
@@ -246,14 +269,18 @@ public class StorageGroupProcessor {
         if (resource.getFile().length() == 0) {
           deleteTsfile(resource.getFile());
         }
-        allDirectFileVersions.addAll(resource.getHistoricalVersions());
+        String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
+        long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
       }
       for (TsFileResource resource : unseqTsFiles) {
         //After recover, case the TsFile's length is equal to 0, delete both the TsFileResource and the file itself
         if (resource.getFile().length() == 0) {
           deleteTsfile(resource.getFile());
         }
-        allDirectFileVersions.addAll(resource.getHistoricalVersions());
+        String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
+        long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
       }
 
       String taskName = storageGroupName + "-" + System.currentTimeMillis();
@@ -307,17 +334,15 @@ public class StorageGroupProcessor {
    * @return version controller
    */
   private VersionController getVersionControllerByTimePartitionId(long timePartitionId) {
-    VersionController res = timePartitionIdVersionControllerMap.get(timePartitionId);
-    if (res == null) {
-      try {
-        res = new SimpleFileVersionController(storageGroupSysDir.getPath(), timePartitionId);
-        timePartitionIdVersionControllerMap.put(timePartitionId, res);
-      } catch (IOException e) {
-        logger.error("can't build a version controller for time partition" + timePartitionId);
-      }
-    }
-
-    return res;
+    return timePartitionIdVersionControllerMap.computeIfAbsent(timePartitionId,
+        id -> {
+          try {
+            return new SimpleFileVersionController(storageGroupSysDir.getPath(), timePartitionId);
+          } catch (IOException e) {
+            logger.error("can't build a version controller for time partition {}", timePartitionId);
+            return null;
+          }
+        });
   }
 
   private List<TsFileResource> getAllFiles(List<String> folders) {
@@ -328,17 +353,20 @@ public class StorageGroupProcessor {
         continue;
       }
 
-      for (File timeRangeFileFolder : fileFolder.listFiles()) {
-        // some TsFileResource may be being persisted when the system crashed, try recovering such
-        // resources
-        continueFailedRenames(timeRangeFileFolder, TEMP_SUFFIX);
+      File[] subFiles = fileFolder.listFiles();
+      if (subFiles != null) {
+        for (File timeRangeFileFolder : subFiles) {
+          // some TsFileResource may be being persisted when the system crashed, try recovering such
+          // resources
+          continueFailedRenames(timeRangeFileFolder, TEMP_SUFFIX);
 
-        // some TsFiles were going to be replaced by the merged files when the system crashed and
-        // the process was interrupted before the merged files could be named
-        continueFailedRenames(timeRangeFileFolder, MERGE_SUFFIX);
+          // some TsFiles were going to be replaced by the merged files when the system crashed and
+          // the process was interrupted before the merged files could be named
+          continueFailedRenames(timeRangeFileFolder, MERGE_SUFFIX);
 
-        Collections.addAll(tsFiles,
-            fsFactory.listFilesBySuffix(timeRangeFileFolder.getAbsolutePath(), TSFILE_SUFFIX));
+          Collections.addAll(tsFiles,
+              fsFactory.listFilesBySuffix(timeRangeFileFolder.getAbsolutePath(), TSFILE_SUFFIX));
+        }
       }
 
     }
@@ -796,12 +824,12 @@ public class StorageGroupProcessor {
    * @return file name
    */
   private String getNewTsFileName(long timePartitionId) {
-    return getNewTsFileName(System.currentTimeMillis(),
-        getVersionControllerByTimePartitionId(timePartitionId).nextVersion(), 0);
+    long version = getVersionControllerByTimePartitionId(timePartitionId).nextVersion();
+    partitionDirectFileVersions.computeIfAbsent(timePartitionId, p -> new HashSet<>()).add(version);
+    return getNewTsFileName(System.currentTimeMillis(), version, 0);
   }
 
   private String getNewTsFileName(long time, long version, int mergeCnt) {
-    allDirectFileVersions.add(version);
     return time + IoTDBConstant.TSFILE_NAME_SEPARATOR + version
         + IoTDBConstant.TSFILE_NAME_SEPARATOR + mergeCnt + TSFILE_SUFFIX;
   }
@@ -1185,22 +1213,7 @@ public class StorageGroupProcessor {
       // time partition to divide storage group
       long timePartitionId = StorageEngine.fromTimeToTimePartition(timestamp);
       // write log
-      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-        DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId));
-        for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) {
-          if (entry.getKey() <= timePartitionId) {
-            entry.getValue().getLogNode()
-                .write(deletionPlan);
-          }
-        }
-
-        for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) {
-          if (entry.getKey() <= timePartitionId) {
-            entry.getValue().getLogNode()
-                .write(deletionPlan);
-          }
-        }
-      }
+      logDeletion(timestamp, deviceId, measurementId, timePartitionId);
 
       Path fullPath = new Path(deviceId, measurementId);
       Deletion deletion = new Deletion(fullPath,
@@ -1225,6 +1238,26 @@ public class StorageGroupProcessor {
     }
   }
 
+  private void logDeletion(long timestamp, String deviceId, String measurementId, long timePartitionId)
+      throws IOException {
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+      DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId));
+      for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) {
+        if (entry.getKey() <= timePartitionId) {
+          entry.getValue().getLogNode()
+              .write(deletionPlan);
+        }
+      }
+
+      for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) {
+        if (entry.getKey() <= timePartitionId) {
+          entry.getValue().getLogNode()
+              .write(deletionPlan);
+        }
+      }
+    }
+  }
+
 
   private void deleteDataInFiles(Collection<TsFileResource> tsFileResourceList, Deletion deletion,
       List<ModificationFile> updatedModFiles)
@@ -1559,6 +1592,7 @@ public class StorageGroupProcessor {
   public void loadNewTsFile(TsFileResource newTsFileResource)
       throws TsFileProcessorException {
     File tsfileToBeInserted = newTsFileResource.getFile();
+    long newFilePartitionId = Long.parseLong(tsfileToBeInserted.getParent());
     writeLock();
     mergeLock.writeLock().lock();
     try {
@@ -1569,17 +1603,20 @@ public class StorageGroupProcessor {
       // check new tsfile
       outer:
       for (int i = 0; i < sequenceList.size(); i++) {
-        if (sequenceList.get(i).getFile().getName().equals(tsfileToBeInserted.getName())) {
+        TsFileResource localFile = sequenceList.get(i);
+        if (localFile.getFile().getName().equals(tsfileToBeInserted.getName())) {
           return;
         }
-        if (i == sequenceList.size() - 1 && sequenceList.get(i).getEndTimeMap().isEmpty()) {
+        long localPartitionId = Long.parseLong(localFile.getFile().getParent());
+        if (i == sequenceList.size() - 1 && localFile.getEndTimeMap().isEmpty()
+            || newFilePartitionId != localPartitionId) {
           continue;
         }
         boolean hasPre = false, hasSubsequence = false;
         for (String device : newTsFileResource.getStartTimeMap().keySet()) {
-          if (sequenceList.get(i).getStartTimeMap().containsKey(device)) {
-            long startTime1 = sequenceList.get(i).getStartTimeMap().get(device);
-            long endTime1 = sequenceList.get(i).getEndTimeMap().get(device);
+          if (localFile.getStartTimeMap().containsKey(device)) {
+            long startTime1 = localFile.getStartTimeMap().get(device);
+            long endTime1 = localFile.getEndTimeMap().get(device);
             long startTime2 = newTsFileResource.getStartTimeMap().get(device);
             long endTime2 = newTsFileResource.getEndTimeMap().get(device);
             if (startTime1 > endTime2) {
@@ -1625,7 +1662,10 @@ public class StorageGroupProcessor {
 
       // update latest time map
       updateLatestTimeMap(newTsFileResource);
-      allDirectFileVersions.addAll(newTsFileResource.getHistoricalVersions());
+      String[] filePathSplit = FilePathUtils.splitTsFilePath(newTsFileResource);
+      long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+      partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+          .addAll(newTsFileResource.getHistoricalVersions());
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -1642,8 +1682,10 @@ public class StorageGroupProcessor {
    * If the historical versions of a file is a sub-set of the given file's, remove it to reduce
    * unnecessary merge. Only used when the file sender and the receiver share the same file
    * close policy.
+   * Warning: DO NOT REMOVE
    * @param resource
    */
+  @SuppressWarnings("unused")
   public void removeFullyOverlapFiles(TsFileResource resource) {
     writeLock();
     closeQueryLock.writeLock().lock();
@@ -1985,8 +2027,9 @@ public class StorageGroupProcessor {
     return storageGroupName;
   }
 
-  public boolean isFileAlreadyExist(TsFileResource tsFileResource) {
-    return allDirectFileVersions.containsAll(tsFileResource.getHistoricalVersions());
+  public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) {
+    return partitionDirectFileVersions.getOrDefault(partitionNum, Collections.emptySet())
+        .containsAll(tsFileResource.getHistoricalVersions());
   }
 
   @FunctionalInterface