You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/07/29 02:47:41 UTC

[incubator-iotdb] branch merge updated: change back recover and query

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/merge by this push:
     new e55e752  change back recover and query
e55e752 is described below

commit e55e75254ae0f890919052152030bfe6b2c96b52
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Wed Jul 29 10:48:00 2020 +0800

    change back recover and query
---
 .../engine/storagegroup/StorageGroupProcessor.java | 38 ++++++++--------
 .../writelog/recover/TsFileRecoverPerformer.java   | 50 ++++++++--------------
 2 files changed, 34 insertions(+), 54 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 3de4202..2fa9008 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -551,12 +551,9 @@ public class StorageGroupProcessor {
       TsFileResource tsFileResource = tsFiles.get(i);
       long timePartitionId = tsFileResource.getTimePartition();
 
-      List<List<TsFileResource>> defaultVmTsFileResources = new ArrayList<>();
-      defaultVmTsFileResources.add(new ArrayList<>());
-
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(
           storageGroupName + FILE_NAME_SEPARATOR,
-          getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true,
+          getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, isSeq,
           i == tsFiles.size() - 1);
 
       RestorableTsFileIOWriter writer;
@@ -580,15 +577,14 @@ public class StorageGroupProcessor {
               this::closeUnsealedTsFileProcessorCallBack, this::updateLatestFlushTimeCallback,
               true, writer);
           workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
-          tsFileResource.setProcessor(tsFileProcessor);
         } else {
           tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
               getVersionControllerByTimePartitionId(timePartitionId),
               this::closeUnsealedTsFileProcessorCallBack, this::unsequenceFlushCallback, false,
               writer);
           workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
-          tsFileResource.setProcessor(tsFileProcessor);
         }
+        tsFileResource.setProcessor(tsFileProcessor);
         tsFileResource.removeResourceFile();
         tsFileProcessor.setTimeRangeId(timePartitionId);
         writer.makeMetadataVisible();
@@ -744,11 +740,11 @@ public class StorageGroupProcessor {
    * inserted are in the range [start, end)
    *
    * @param insertTabletPlan insert a tablet of a device
-   * @param sequence whether is sequence
-   * @param start start index of rows to be inserted in insertTabletPlan
-   * @param end end index of rows to be inserted in insertTabletPlan
-   * @param results result array
-   * @param timePartitionId time partition id
+   * @param sequence         whether is sequence
+   * @param start            start index of rows to be inserted in insertTabletPlan
+   * @param end              end index of rows to be inserted in insertTabletPlan
+   * @param results          result array
+   * @param timePartitionId  time partition id
    * @return false if any failure occurs when inserting the tablet, true otherwise
    */
   private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
@@ -898,10 +894,10 @@ public class StorageGroupProcessor {
   /**
    * get processor from hashmap, flush oldest processor if necessary
    *
-   * @param timeRangeId time partition range
+   * @param timeRangeId            time partition range
    * @param tsFileProcessorTreeMap tsFileProcessorTreeMap
-   * @param fileList file list to add new processor
-   * @param sequence whether is sequence or not
+   * @param fileList               file list to add new processor
+   * @param sequence               whether is sequence or not
    */
   private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
       TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1347,10 +1343,10 @@ public class StorageGroupProcessor {
    * Delete data whose timestamp <= 'timestamp' and belongs to the time series
    * deviceId.measurementId.
    *
-   * @param deviceId the deviceId of the timeseries to be deleted.
+   * @param deviceId      the deviceId of the timeseries to be deleted.
    * @param measurementId the measurementId of the timeseries to be deleted.
-   * @param startTime the startTime of delete range.
-   * @param endTime the endTime of delete range.
+   * @param startTime     the startTime of delete range.
+   * @param endTime       the endTime of delete range.
    */
   public void delete(String deviceId, String measurementId, long startTime, long endTime)
       throws IOException {
@@ -2107,9 +2103,9 @@ public class StorageGroupProcessor {
    * returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
    * version number is the version number in the tsfile with a larger timestamp.
    *
-   * @param tsfileName origin tsfile name
+   * @param tsfileName  origin tsfile name
    * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
-   * 1]
+   *                    1]
    * @return appropriate filename
    */
   private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2173,8 +2169,8 @@ public class StorageGroupProcessor {
   /**
    * Execute the loading process by the type.
    *
-   * @param type load type
-   * @param tsFileResource tsfile resource to be loaded
+   * @param type            load type
+   * @param tsFileResource  tsfile resource to be loaded
    * @param filePartitionId the partition id of the new file
    * @return load the file successfully
    * @UsedBy sync module, load external tsfile module.
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index f9ceaa7..be0ebb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -23,15 +23,12 @@ import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SU
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -46,7 +43,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,12 +55,12 @@ public class TsFileRecoverPerformer {
 
   private static final Logger logger = LoggerFactory.getLogger(TsFileRecoverPerformer.class);
 
-  private String filePath;
-  private String logNodePrefix;
-  private VersionController versionController;
-  private TsFileResource tsFileResource;
-  private boolean sequence;
-  private boolean isLastFile;
+  private final String filePath;
+  private final String logNodePrefix;
+  private final VersionController versionController;
+  private final TsFileResource tsFileResource;
+  private final boolean sequence;
+  private final boolean isLastFile;
 
   /**
    * @param isLastFile whether this TsFile is the last file of its partition
@@ -110,29 +106,22 @@ public class TsFileRecoverPerformer {
 
     // judge whether tsfile is complete
     if (!restorableTsFileIOWriter.hasCrashed()) {
-      // if tsfile is complete, then there is no vmfiles.
       try {
-        recoverResource(tsFileResource);
+        recoverResource();
         return restorableTsFileIOWriter;
       } catch (IOException e) {
         throw new StorageGroupProcessorException(
-            "recover the resource file failed: " + filePath
-                + RESOURCE_SUFFIX + e);
+            "recover the resource file failed: " + filePath + RESOURCE_SUFFIX + e);
       }
     }
 
     // tsfile has crashed
     // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
     // map must be updated first to avoid duplicated insertion
-    recoverResourceFromWriter(restorableTsFileIOWriter, tsFileResource);
+    recoverResourceFromWriter(restorableTsFileIOWriter);
 
-    // If the vm is not enable, the walTargetWriter points to  the tsfile.
-    // If the vm is enable and flush log exists, the walTargetWriter points to the vm of the flush log
-    // if the vm is enable and flush log does not exist, the walTargetWriter is null.
-    RestorableTsFileIOWriter walTargetWriter = restorableTsFileIOWriter;
-    TsFileResource walTargetResource = tsFileResource;
     // redo logs
-    redoLogs(walTargetWriter, walTargetResource, restorableTsFileIOWriter);
+    redoLogs(restorableTsFileIOWriter);
 
     // clean logs
     try {
@@ -144,10 +133,10 @@ public class TsFileRecoverPerformer {
     return restorableTsFileIOWriter;
   }
 
-  private void recoverResource(TsFileResource tsFileResource) throws IOException {
+  private void recoverResource() throws IOException {
     if (tsFileResource.resourceFileExists()) {
       // .resource file exists, deserialize it
-      recoverResourceFromFile(tsFileResource);
+      recoverResourceFromFile();
     } else {
       // .resource file does not exist, read file metadata and recover tsfile resource
       try (TsFileSequenceReader reader = new TsFileSequenceReader(
@@ -163,18 +152,18 @@ public class TsFileRecoverPerformer {
     }
   }
 
-  private void recoverResourceFromFile(TsFileResource tsFileResource) throws IOException {
+  private void recoverResourceFromFile() throws IOException {
     try {
       tsFileResource.deserialize();
     } catch (IOException e) {
       logger.warn("Cannot deserialize TsFileResource {}, construct it using "
           + "TsFileSequenceReader", tsFileResource.getTsFile(), e);
-      recoverResourceFromReader(tsFileResource);
+      recoverResourceFromReader();
     }
   }
 
 
-  private void recoverResourceFromReader(TsFileResource tsFileResource) throws IOException {
+  private void recoverResourceFromReader() throws IOException {
     try (TsFileSequenceReader reader =
         new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath(), true)) {
       for (Entry<String, List<TimeseriesMetadata>> entry : reader.getAllTimeseriesMetadata()
@@ -192,8 +181,7 @@ public class TsFileRecoverPerformer {
   }
 
 
-  private void recoverResourceFromWriter(RestorableTsFileIOWriter restorableTsFileIOWriter,
-      TsFileResource tsFileResource) {
+  private void recoverResourceFromWriter(RestorableTsFileIOWriter restorableTsFileIOWriter) {
     Map<String, List<ChunkMetadata>> deviceChunkMetaDataMap =
         restorableTsFileIOWriter.getDeviceChunkMetadataMap();
     for (Map.Entry<String, List<ChunkMetadata>> entry : deviceChunkMetaDataMap.entrySet()) {
@@ -209,15 +197,13 @@ public class TsFileRecoverPerformer {
     tsFileResource.setHistoricalVersions(Collections.singleton(fileVersion));
   }
 
-  private boolean redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter,
-      TsFileResource tsFileResource, RestorableTsFileIOWriter tsFileIOWriter)
+  private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter)
       throws StorageGroupProcessorException {
     IMemTable recoverMemTable = new PrimitiveMemTable();
     recoverMemTable.setVersion(versionController.nextVersion());
     LogReplayer logReplayer = new LogReplayer(logNodePrefix, filePath, tsFileResource.getModFile(),
         versionController, tsFileResource, recoverMemTable, sequence);
     logReplayer.replayLogs();
-    boolean res = false;
     try {
       if (!recoverMemTable.isEmpty()) {
         // flush logs
@@ -225,7 +211,6 @@ public class TsFileRecoverPerformer {
             restorableTsFileIOWriter,
             tsFileResource.getTsFile().getParentFile().getParentFile().getName());
         tableFlushTask.syncFlushMemTable();
-        res = true;
       }
 
       if (!isLastFile || tsFileResource.isCloseFlagSet()) {
@@ -236,7 +221,6 @@ public class TsFileRecoverPerformer {
       }
       // otherwise this file is not closed before crush, do nothing so we can continue writing
       // into it
-      return res;
     } catch (IOException | InterruptedException | ExecutionException e) {
       throw new StorageGroupProcessorException(e);
     }