You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/04/10 10:03:30 UTC

[incubator-iotdb] branch separate_file_version created (now f5edb58)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch separate_file_version
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at f5edb58  separate file version from chunk version

This branch includes the following new commits:

     new f5edb58  separate file version from chunk version

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: separate file version from chunk version

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch separate_file_version
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit f5edb584b384e662222714f9e06e87da1f0781f0
Author: jt2594838 <jt...@163.com>
AuthorDate: Fri Apr 10 18:03:16 2020 +0800

    separate file version from chunk version
---
 .../engine/storagegroup/StorageGroupProcessor.java | 22 +++++++++++++++++++++-
 1 file changed, 21 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index d3afe72..edc67e8 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -224,6 +224,15 @@ public class StorageGroupProcessor {
    */
   private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
 
+  /**
+   * The max file versions in each partition. By recording this, if several IoTDB instances have
+   * the same policy of closing file and their ingestion is identical, then files of the same
+   * version in different IoTDB instance will have identical data, providing convenience for data
+   * comparison across different instances.
+   * partition number -> max version number
+   */
+  private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
+
   public StorageGroupProcessor(String systemDir, String storageGroupName,
       TsFileFlushPolicy fileFlushPolicy) throws StorageGroupProcessorException {
     this.storageGroupName = storageGroupName;
@@ -257,10 +266,12 @@ public class StorageGroupProcessor {
       for (TsFileResource resource : sequenceFileTreeSet) {
         long partitionNum = resource.getTimePartition();
         partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+        updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
       }
       for (TsFileResource resource : unSequenceFileList) {
         long partitionNum = resource.getTimePartition();
         partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+        updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
       }
 
       String taskName = storageGroupName + "-" + System.currentTimeMillis();
@@ -293,6 +304,12 @@ public class StorageGroupProcessor {
     }
   }
 
+  private void updatePartitionFileVersion(long partitionNum, long fileVersion) {
+    long oldVersion = partitionMaxFileVersions.getOrDefault(partitionNum, 0L);
+    if (fileVersion > oldVersion) {
+      partitionMaxFileVersions.put(partitionNum, fileVersion);
+    }
+  }
 
   /**
    * get version controller by time partition Id Thread-safety should be ensure by caller
@@ -783,7 +800,8 @@ public class StorageGroupProcessor {
    * @return file name
    */
   private String getNewTsFileName(long timePartitionId) {
-    long version = getVersionControllerByTimePartitionId(timePartitionId).nextVersion();
+    long version = partitionMaxFileVersions.getOrDefault(timePartitionId, 0L) + 1;
+    partitionMaxFileVersions.put(timePartitionId, version);
     partitionDirectFileVersions.computeIfAbsent(timePartitionId, p -> new HashSet<>()).add(version);
     return getNewTsFileName(System.currentTimeMillis(), version, 0);
   }
@@ -1567,6 +1585,7 @@ public class StorageGroupProcessor {
       long partitionNum = newTsFileResource.getTimePartition();
       partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
           .addAll(newTsFileResource.getHistoricalVersions());
+      updatePartitionFileVersion(partitionNum, Collections.max(newTsFileResource.getHistoricalVersions()));
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -1869,6 +1888,7 @@ public class StorageGroupProcessor {
     }
     partitionDirectFileVersions.computeIfAbsent(filePartitionId,
         p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
+    updatePartitionFileVersion(filePartitionId, Collections.max(tsFileResource.getHistoricalVersions()));
     return true;
   }