You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/12/31 06:52:27 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13] [IOTDB-5266] Fix the issue that the TimeIndex may not be accurate for one device when selecting cross space compaction task (#8688)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 310d9779f4 [To rel/0.13] [IOTDB-5266] Fix the issue that the TimeIndex may not be accurate for one device when selecting cross space compaction task (#8688)
310d9779f4 is described below

commit 310d9779f4207898496eff81b92bedde068a4e1a
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Sat Dec 31 14:52:20 2022 +0800

    [To rel/0.13] [IOTDB-5266] Fix the issue that the TimeIndex may not be accurate for one device when selecting cross space compaction task (#8688)
---
 .../cross/rewrite/TsFileDeviceInfoStore.java       | 103 +++++++++++++++++++++
 .../selector/RewriteCompactionFileSelector.java    |  27 ++++--
 .../db/engine/storagegroup/TsFileResource.java     |  19 ++++
 .../storagegroup/timeindex/DeviceTimeIndex.java    |   4 +
 .../engine/storagegroup/timeindex/ITimeIndex.java  |   9 ++
 5 files changed, 155 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/TsFileDeviceInfoStore.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/TsFileDeviceInfoStore.java
new file mode 100644
index 0000000000..e9f01a9295
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/TsFileDeviceInfoStore.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.cross.rewrite;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TsFileDeviceInfoStore {
+
+  private Map<TsFileResource, TsFileDeviceInfo> cache;
+
+  public TsFileDeviceInfoStore() {
+    cache = new HashMap<>();
+  }
+
+  public TsFileDeviceInfo get(TsFileResource tsFileResource) {
+    return cache.computeIfAbsent(tsFileResource, TsFileDeviceInfo::new);
+  }
+
+  public static class TsFileDeviceInfo {
+    public TsFileResource resource;
+    private Map<String, DeviceInfo> deviceInfoMap;
+
+    public TsFileDeviceInfo(TsFileResource tsFileResource) {
+      this.resource = tsFileResource;
+    }
+
+    private void prepareDeviceInfos() throws IOException {
+      if (deviceInfoMap != null) {
+        return;
+      }
+      deviceInfoMap = new LinkedHashMap<>();
+      if (TimeIndexLevel.valueOf(resource.getTimeIndexType()) == TimeIndexLevel.FILE_TIME_INDEX) {
+        DeviceTimeIndex timeIndex = resource.buildDeviceTimeIndex();
+        for (String deviceId : timeIndex.getDevices()) {
+          deviceInfoMap.put(
+              deviceId,
+              new DeviceInfo(
+                  deviceId, timeIndex.getStartTime(deviceId), timeIndex.getEndTime(deviceId)));
+        }
+      } else {
+        for (String deviceId : resource.getDevices()) {
+          deviceInfoMap.put(
+              deviceId,
+              new DeviceInfo(
+                  deviceId, resource.getStartTime(deviceId), resource.getEndTime(deviceId)));
+        }
+      }
+    }
+
+    public List<DeviceInfo> getDevices() throws IOException {
+      prepareDeviceInfos();
+      return new ArrayList<>(deviceInfoMap.values());
+    }
+
+    public DeviceInfo getDeviceInfoById(String deviceId) throws IOException {
+      prepareDeviceInfos();
+      return deviceInfoMap.get(deviceId);
+    }
+
+    public boolean containsDevice(String deviceId) throws IOException {
+      prepareDeviceInfos();
+      return deviceInfoMap.containsKey(deviceId);
+    }
+  }
+
+  public static class DeviceInfo {
+    public String deviceId;
+    public long startTime;
+    public long endTime;
+
+    public DeviceInfo(String deviceId, long startTime, long endTime) {
+      this.deviceId = deviceId;
+      this.startTime = startTime;
+      this.endTime = endTime;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
index caf5943a4a..e35fe27bdf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.engine.compaction.cross.rewrite.selector;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore.DeviceInfo;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore.TsFileDeviceInfo;
 import org.apache.iotdb.db.engine.compaction.cross.rewrite.manage.CrossSpaceCompactionResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -75,6 +78,10 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
 
   private AbstractCompactionEstimator compactionEstimator;
 
+  // Cache the DeviceInfos for used seqFiles to avoid loading DeviceTimeIndex more than 1 times from
+  // disk for each seqFile because each seqFile may be scanned more than 1 times in each selector
+  private final TsFileDeviceInfoStore deviceInfoStore;
+
   public RewriteCompactionFileSelector(CrossSpaceCompactionResource resource, long memoryBudget) {
     this.resource = resource;
     this.memoryBudget = memoryBudget;
@@ -83,6 +90,7 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
     this.maxCrossCompactionFileSize =
         IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileSize();
     this.compactionEstimator = new RewriteCrossCompactionEstimator();
+    this.deviceInfoStore = new TsFileDeviceInfoStore();
   }
 
   /**
@@ -274,16 +282,21 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
    *
    * @param unseqFile the tsFileResource of unseqFile to be compacted
    */
-  private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
+  private void selectOverlappedSeqFiles(TsFileResource unseqFile) throws IOException {
     final int SELECT_WARN_THRESHOLD = 10;
-    for (String deviceId : unseqFile.getDevices()) {
-      long unseqStartTime = unseqFile.getStartTime(deviceId);
-      long unseqEndTime = unseqFile.getEndTime(deviceId);
+    // It is unnecessary to cache DeviceInfo for unseqFile into store because it is only be used
+    // once in every selector.
+    TsFileDeviceInfo unseqFileDeviceInfo = new TsFileDeviceInfo(unseqFile);
+    for (DeviceInfo deviceInfo : unseqFileDeviceInfo.getDevices()) {
+      String deviceId = deviceInfo.deviceId;
+      long unseqStartTime = deviceInfo.startTime;
+      long unseqEndTime = deviceInfo.endTime;
 
       boolean noMoreOverlap = false;
       for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) {
         TsFileResource seqFile = resource.getSeqFiles().get(i);
-        if (!seqFile.mayContainsDevice(deviceId)) {
+        TsFileDeviceInfo seqFileDeviceInfo = deviceInfoStore.get(seqFile);
+        if (!seqFileDeviceInfo.containsDevice(deviceId)) {
           continue;
         }
         int crossSpaceCompactionTimes = 0;
@@ -295,8 +308,8 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
           logger.warn("Meets IOException when selecting files for cross space compaction", e);
         }
 
-        long seqEndTime = seqFile.getEndTime(deviceId);
-        long seqStartTime = seqFile.getStartTime(deviceId);
+        long seqEndTime = seqFileDeviceInfo.getDeviceInfoById(deviceId).endTime;
+        long seqStartTime = seqFileDeviceInfo.getDeviceInfoById(deviceId).startTime;
         if (!seqFile.isClosed()) {
           // for unclosed file, only select those that overlap with the unseq file
           if (unseqEndTime >= seqStartTime) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 7f6707f98b..4aff97685a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -317,6 +317,25 @@ public class TsFileResource {
     }
   }
 
+  public DeviceTimeIndex buildDeviceTimeIndex() throws IOException {
+    readLock();
+    try (InputStream inputStream =
+        FSFactoryProducer.getFSFactory()
+            .getBufferedInputStream(file.getPath() + TsFileResource.RESOURCE_SUFFIX)) {
+      ReadWriteIOUtils.readByte(inputStream);
+      ITimeIndex timeIndexFromResourceFile = ITimeIndex.createTimeIndex(inputStream);
+      if (!(timeIndexFromResourceFile instanceof DeviceTimeIndex)) {
+        throw new IOException("cannot build DeviceTimeIndex from resource " + file.getPath());
+      }
+      return (DeviceTimeIndex) timeIndexFromResourceFile;
+    } catch (Exception e) {
+      throw new IOException(
+          "Can't read file " + file.getPath() + TsFileResource.RESOURCE_SUFFIX + " from disk", e);
+    } finally {
+      readUnlock();
+    }
+  }
+
   public void updateStartTime(String device, long time) {
     timeIndex.updateStartTime(device, time);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index b843de0e8b..8ffd113ed1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -150,6 +150,10 @@ public class DeviceTimeIndex implements ITimeIndex {
     return deviceToIndex.keySet();
   }
 
+  public Set<String> getDevices() {
+    return deviceToIndex.keySet();
+  }
+
   @Override
   public boolean endTimeEmpty() {
     for (long endTime : endTimes) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
index b4e127bd3b..c725a2f8e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.storagegroup.timeindex;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.PartitionViolationException;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -186,4 +187,12 @@ public interface ITimeIndex {
    * it may or may not contain this device
    */
   boolean mayContainsDevice(String device);
+
+  static ITimeIndex createTimeIndex(InputStream inputStream) throws IOException {
+    byte timeIndexType = ReadWriteIOUtils.readByte(inputStream);
+    if (timeIndexType == -1) {
+      throw new IOException("The end of stream has been reached");
+    }
+    return TimeIndexLevel.valueOf(timeIndexType).getTimeIndex().deserialize(inputStream);
+  }
 }