You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/12/27 03:14:09 UTC

[incubator-iotdb] branch track_file_historical_versions created (now 7548a46)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch track_file_historical_versions
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 7548a46  Track the merge history in TsFileResource

This branch includes the following new commits:

     new 7548a46  Track the merge history in TsFileResource

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: Track the merge history in TsFileResource

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch track_file_historical_versions
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 7548a46d7e2e30fb61a546eae066c1ae8713c82a
Author: jt2594838 <jt...@163.com>
AuthorDate: Fri Dec 27 11:12:33 2019 +0800

    Track the merge history in TsFileResource
---
 .../iotdb/db/engine/merge/task/MergeFileTask.java  | 18 ++++++++
 .../db/engine/storagegroup/TsFileProcessor.java    |  4 +-
 .../db/engine/storagegroup/TsFileResource.java     | 52 +++++++++++++++-------
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 31 +++++++------
 .../writelog/recover/TsFileRecoverPerformer.java   | 22 +++------
 5 files changed, 83 insertions(+), 44 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 7b179fa..3b1cdc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -23,9 +23,11 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
@@ -150,6 +152,7 @@ class MergeFileTask {
       }
       oldFileWriter.endFile(new Schema(newFileWriter.getKnownSchema()));
 
+      updateHistoricalVersions(seqFile);
       seqFile.serialize();
       mergeLogger.logFileMergeEnd();
       logger.debug("{} moved merged chunks of {} to the old file", taskName, seqFile);
@@ -167,6 +170,20 @@ class MergeFileTask {
     }
   }
 
+  private void updateHistoricalVersions(TsFileResource seqFile) {
+    // as the new file contains data of other files, track their versions in the new file
+    // so that we will be able to compare data across different IoTDBs that share the same file
+    // generation policy
+    // however, since the data of unseq files are mixed together, we won't be able to know
+    // which files are exactly contained in the new file, so we have to record all unseq files
+    // in the new file
+    Set<Long> newHistoricalVersions = new HashSet<>(seqFile.getHistoricalVersions());
+    for (TsFileResource unseqFiles : resource.getUnseqFiles()) {
+      newHistoricalVersions.addAll(unseqFiles.getHistoricalVersions());
+    }
+    seqFile.setHistoricalVersions(newHistoricalVersions);
+  }
+
   private void writeMergedChunkGroup(ChunkGroupMetaData chunkGroupMetaData,
       TsFileSequenceReader reader, TsFileIOWriter fileWriter)
       throws IOException {
@@ -213,6 +230,7 @@ class MergeFileTask {
 
     fileWriter.endFile(new Schema(fileWriter.getKnownSchema()));
 
+    updateHistoricalVersions(seqFile);
     seqFile.serialize();
     mergeLogger.logFileMergeEnd();
     logger.debug("{} moved unmerged chunks of {} to the new file", taskName, seqFile);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index eec8828..cdf649e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -48,7 +48,6 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseTsFile
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -131,6 +130,9 @@ public class TsFileProcessor {
     this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
     this.sequence = sequence;
     logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath());
+
+    // a file generated by flush has only one historical version, which is itself
+    this.tsFileResource.setHistoricalVersions(Collections.singleton(versionController.currVersion()));
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 32817cf..3be1329 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -23,10 +23,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
@@ -66,6 +68,12 @@ public class TsFileResource {
   private volatile boolean deleted = false;
   private volatile boolean isMerging = false;
 
+  // historicalVersions are used to track the merge history of a TsFile. For a TsFile generated
+  // by flush, this field only contains its own version number. For a TsFile generated by merge,
+  // its historicalVersions are the union of all TsFiles' historicalVersions that joined this merge.
+  // This field helps us compare the files that are generated by different IoTDBs that share the
+  // same file generation policy but have their own merge policies.
+  private Set<Long> historicalVersions;
 
   /**
    * Chunk metadata list of unsealed tsfile. Only be set in a temporal TsFileResource in a query
@@ -96,15 +104,6 @@ public class TsFileResource {
     this.processor = processor;
   }
 
-  public TsFileResource(File file,
-      Map<String, Long> startTimeMap,
-      Map<String, Long> endTimeMap) {
-    this.file = file;
-    this.startTimeMap = startTimeMap;
-    this.endTimeMap = endTimeMap;
-    this.closed = true;
-  }
-
   /**
    * unsealed TsFile
    */
@@ -133,6 +132,13 @@ public class TsFileResource {
         ReadWriteIOUtils.write(entry.getKey(), outputStream);
         ReadWriteIOUtils.write(entry.getValue(), outputStream);
       }
+
+      if (historicalVersions != null) {
+        ReadWriteIOUtils.write(this.historicalVersions.size(), outputStream);
+        for (Long historicalVersion : historicalVersions) {
+          ReadWriteIOUtils.write(historicalVersion, outputStream);
+        }
+      }
     }
     File src = fsFactory.getFile(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
     File dest = fsFactory.getFile(file + RESOURCE_SUFFIX);
@@ -159,6 +165,14 @@ public class TsFileResource {
       }
       this.startTimeMap = startTimes;
       this.endTimeMap = endTimes;
+
+      if (inputStream.available() > 0) {
+        int versionSize = ReadWriteIOUtils.readInt(inputStream);
+        historicalVersions = new HashSet<>();
+        for (int i = 0; i < versionSize; i++) {
+          historicalVersions.add(ReadWriteIOUtils.readLong(inputStream));
+        }
+      }
     }
   }
 
@@ -180,7 +194,7 @@ public class TsFileResource {
     return fsFactory.getFile(file + RESOURCE_SUFFIX).exists();
   }
 
-  public void forceUpdateEndTime(String device, long time) {
+  void forceUpdateEndTime(String device, long time) {
       endTimeMap.put(device, time);
   }
 
@@ -203,7 +217,7 @@ public class TsFileResource {
     this.file = file;
   }
 
-  public boolean containsDevice(String deviceId) {
+  boolean containsDevice(String deviceId) {
     return startTimeMap.containsKey(deviceId);
   }
 
@@ -237,7 +251,7 @@ public class TsFileResource {
     chunkMetaDataList = null;
   }
 
-  public TsFileProcessor getUnsealedFileProcessor() {
+  TsFileProcessor getUnsealedFileProcessor() {
     return processor;
   }
 
@@ -245,7 +259,7 @@ public class TsFileResource {
     return writeQueryLock;
   }
 
-  public void doUpgrade() {
+  void doUpgrade() {
     if (UpgradeUtils.isNeedUpgrade(this)) {
       UpgradeSevice.getINSTANCE().submitUpgradeTask(new UpgradeTask(this));
     }
@@ -262,7 +276,7 @@ public class TsFileResource {
     fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX).delete();
   }
 
-  public void moveTo(File targetDir) throws IOException {
+  void moveTo(File targetDir) throws IOException {
     FileUtils.moveFile(file, new File(targetDir, file.getName()));
     FileUtils.moveFile(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX),
         new File(targetDir, file.getName() + RESOURCE_SUFFIX));
@@ -303,7 +317,7 @@ public class TsFileResource {
     this.deleted = deleted;
   }
 
-  public boolean isMerging() {
+  boolean isMerging() {
     return isMerging;
   }
 
@@ -327,4 +341,12 @@ public class TsFileResource {
     }
     return false;
   }
+
+  public Set<Long> getHistoricalVersions() {
+    return historicalVersions;
+  }
+
+  public void setHistoricalVersions(Set<Long> historicalVersions) {
+    this.historicalVersions = historicalVersions;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 0cdc043..cd011ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -39,19 +39,7 @@ public class FileLoaderUtils {
       try (TsFileSequenceReader reader = new TsFileSequenceReader(
           tsFileResource.getFile().getAbsolutePath())) {
         TsFileMetaData metaData = reader.readFileMetadata();
-        for (TsDeviceMetadataIndex index : metaData.getDeviceMap().values()) {
-          TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
-          List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
-              .getChunkGroupMetaDataList();
-          for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
-            for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
-              tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
-                  chunkMetaData.getStartTime());
-              tsFileResource
-                  .updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
-            }
-          }
-        }
+        updateTsFileResource(metaData, reader, tsFileResource);
       }
       // write .resource file
       tsFileResource.serialize();
@@ -59,4 +47,21 @@ public class FileLoaderUtils {
       tsFileResource.deSerialize();
     }
   }
+
+  public static void updateTsFileResource(TsFileMetaData metaData, TsFileSequenceReader reader,
+      TsFileResource tsFileResource) throws IOException {
+    for (TsDeviceMetadataIndex index : metaData.getDeviceMap().values()) {
+      TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
+      List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
+          .getChunkGroupMetaDataList();
+      for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
+        for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
+          tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
+              chunkMetaData.getStartTime());
+          tsFileResource
+              .updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
+        }
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 148cf73..ec29216 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -24,8 +24,10 @@ import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SU
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -33,6 +35,7 @@ import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -107,23 +110,12 @@ public class TsFileRecoverPerformer {
           try (TsFileSequenceReader reader = new TsFileSequenceReader(
               tsFileResource.getFile().getAbsolutePath())) {
             TsFileMetaData metaData = reader.readFileMetadata();
-            List<TsDeviceMetadataIndex> deviceMetadataIndexList = new ArrayList<>(
-                metaData.getDeviceMap().values());
-            for (TsDeviceMetadataIndex index : deviceMetadataIndexList) {
-              TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
-              List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
-                  .getChunkGroupMetaDataList();
-              for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
-                for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
-                  tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
-                      chunkMetaData.getStartTime());
-                  tsFileResource
-                      .updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
-                }
-              }
-            }
+            FileLoaderUtils.updateTsFileResource(metaData, reader, tsFileResource);
           }
           // write .resource file
+          long fileVersion =
+              Long.parseLong(tsFileResource.getFile().getName().split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[1]);
+          tsFileResource.setHistoricalVersions(Collections.singleton(fileVersion));
           tsFileResource.serialize();
         }
         return;