You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/12/31 01:50:35 UTC

[iotdb] branch xingtanzjr/fix_013_cross_selector created (now 3987c2d3d4)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/fix_013_cross_selector
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 3987c2d3d4 fix the issue that the TimeIndex is not accurate when select crossing compaction task

This branch includes the following new commits:

     new 3987c2d3d4 fix the issue that the TimeIndex is not accurate when select crossing compaction task

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: fix the issue that the TimeIndex is not accurate when select crossing compaction task

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_013_cross_selector
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3987c2d3d4d62cca9156c524878b1dab6435fed2
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Sat Dec 31 09:50:20 2022 +0800

    fix the issue that the TimeIndex is not accurate when select crossing compaction task
---
 .../cross/rewrite/TsFileDeviceInfoStore.java       | 103 +++++++++++++++++++++
 .../selector/RewriteCompactionFileSelector.java    |  27 ++++--
 .../db/engine/storagegroup/TsFileResource.java     |  19 ++++
 .../storagegroup/timeindex/DeviceTimeIndex.java    |   4 +
 .../engine/storagegroup/timeindex/ITimeIndex.java  |   9 ++
 5 files changed, 155 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/TsFileDeviceInfoStore.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/TsFileDeviceInfoStore.java
new file mode 100644
index 0000000000..e9f01a9295
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/TsFileDeviceInfoStore.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.cross.rewrite;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TsFileDeviceInfoStore {
+
+  private Map<TsFileResource, TsFileDeviceInfo> cache;
+
+  public TsFileDeviceInfoStore() {
+    cache = new HashMap<>();
+  }
+
+  public TsFileDeviceInfo get(TsFileResource tsFileResource) {
+    return cache.computeIfAbsent(tsFileResource, TsFileDeviceInfo::new);
+  }
+
+  public static class TsFileDeviceInfo {
+    public TsFileResource resource;
+    private Map<String, DeviceInfo> deviceInfoMap;
+
+    public TsFileDeviceInfo(TsFileResource tsFileResource) {
+      this.resource = tsFileResource;
+    }
+
+    private void prepareDeviceInfos() throws IOException {
+      if (deviceInfoMap != null) {
+        return;
+      }
+      deviceInfoMap = new LinkedHashMap<>();
+      if (TimeIndexLevel.valueOf(resource.getTimeIndexType()) == TimeIndexLevel.FILE_TIME_INDEX) {
+        DeviceTimeIndex timeIndex = resource.buildDeviceTimeIndex();
+        for (String deviceId : timeIndex.getDevices()) {
+          deviceInfoMap.put(
+              deviceId,
+              new DeviceInfo(
+                  deviceId, timeIndex.getStartTime(deviceId), timeIndex.getEndTime(deviceId)));
+        }
+      } else {
+        for (String deviceId : resource.getDevices()) {
+          deviceInfoMap.put(
+              deviceId,
+              new DeviceInfo(
+                  deviceId, resource.getStartTime(deviceId), resource.getEndTime(deviceId)));
+        }
+      }
+    }
+
+    public List<DeviceInfo> getDevices() throws IOException {
+      prepareDeviceInfos();
+      return new ArrayList<>(deviceInfoMap.values());
+    }
+
+    public DeviceInfo getDeviceInfoById(String deviceId) throws IOException {
+      prepareDeviceInfos();
+      return deviceInfoMap.get(deviceId);
+    }
+
+    public boolean containsDevice(String deviceId) throws IOException {
+      prepareDeviceInfos();
+      return deviceInfoMap.containsKey(deviceId);
+    }
+  }
+
+  public static class DeviceInfo {
+    public String deviceId;
+    public long startTime;
+    public long endTime;
+
+    public DeviceInfo(String deviceId, long startTime, long endTime) {
+      this.deviceId = deviceId;
+      this.startTime = startTime;
+      this.endTime = endTime;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
index caf5943a4a..f2dffbeecf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.engine.compaction.cross.rewrite.selector;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore.DeviceInfo;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore.TsFileDeviceInfo;
 import org.apache.iotdb.db.engine.compaction.cross.rewrite.manage.CrossSpaceCompactionResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -75,6 +78,10 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
 
   private AbstractCompactionEstimator compactionEstimator;
 
+  // Cache the DeviceInfos for used seqFiles to avoid loading DeviceTimeIndex more than 1 times from
+  // disk for each seqFile because each seqFile may be scanned more than 1 time in each selector
+  private final TsFileDeviceInfoStore deviceInfoStore;
+
   public RewriteCompactionFileSelector(CrossSpaceCompactionResource resource, long memoryBudget) {
     this.resource = resource;
     this.memoryBudget = memoryBudget;
@@ -83,6 +90,7 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
     this.maxCrossCompactionFileSize =
         IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileSize();
     this.compactionEstimator = new RewriteCrossCompactionEstimator();
+    this.deviceInfoStore = new TsFileDeviceInfoStore();
   }
 
   /**
@@ -274,16 +282,21 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
    *
    * @param unseqFile the tsFileResource of unseqFile to be compacted
    */
-  private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
+  private void selectOverlappedSeqFiles(TsFileResource unseqFile) throws IOException {
     final int SELECT_WARN_THRESHOLD = 10;
-    for (String deviceId : unseqFile.getDevices()) {
-      long unseqStartTime = unseqFile.getStartTime(deviceId);
-      long unseqEndTime = unseqFile.getEndTime(deviceId);
+    // It is unnecessary to cache DeviceInfo for unseqFile into store because it is only be used
+    // once in every selector.
+    TsFileDeviceInfo unseqFileDeviceInfo = new TsFileDeviceInfo(unseqFile);
+    for (DeviceInfo deviceInfo : unseqFileDeviceInfo.getDevices()) {
+      String deviceId = deviceInfo.deviceId;
+      long unseqStartTime = deviceInfo.startTime;
+      long unseqEndTime = deviceInfo.endTime;
 
       boolean noMoreOverlap = false;
       for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) {
         TsFileResource seqFile = resource.getSeqFiles().get(i);
-        if (!seqFile.mayContainsDevice(deviceId)) {
+        TsFileDeviceInfo seqFileDeviceInfo = deviceInfoStore.get(seqFile);
+        if (!seqFileDeviceInfo.containsDevice(deviceId)) {
           continue;
         }
         int crossSpaceCompactionTimes = 0;
@@ -295,8 +308,8 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
           logger.warn("Meets IOException when selecting files for cross space compaction", e);
         }
 
-        long seqEndTime = seqFile.getEndTime(deviceId);
-        long seqStartTime = seqFile.getStartTime(deviceId);
+        long seqEndTime = seqFileDeviceInfo.getDeviceInfoById(deviceId).endTime;
+        long seqStartTime = seqFileDeviceInfo.getDeviceInfoById(deviceId).startTime;
         if (!seqFile.isClosed()) {
           // for unclosed file, only select those that overlap with the unseq file
           if (unseqEndTime >= seqStartTime) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 7f6707f98b..4aff97685a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -317,6 +317,25 @@ public class TsFileResource {
     }
   }
 
+  public DeviceTimeIndex buildDeviceTimeIndex() throws IOException {
+    readLock();
+    try (InputStream inputStream =
+        FSFactoryProducer.getFSFactory()
+            .getBufferedInputStream(file.getPath() + TsFileResource.RESOURCE_SUFFIX)) {
+      ReadWriteIOUtils.readByte(inputStream);
+      ITimeIndex timeIndexFromResourceFile = ITimeIndex.createTimeIndex(inputStream);
+      if (!(timeIndexFromResourceFile instanceof DeviceTimeIndex)) {
+        throw new IOException("cannot build DeviceTimeIndex from resource " + file.getPath());
+      }
+      return (DeviceTimeIndex) timeIndexFromResourceFile;
+    } catch (Exception e) {
+      throw new IOException(
+          "Can't read file " + file.getPath() + TsFileResource.RESOURCE_SUFFIX + " from disk", e);
+    } finally {
+      readUnlock();
+    }
+  }
+
   public void updateStartTime(String device, long time) {
     timeIndex.updateStartTime(device, time);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index b843de0e8b..8ffd113ed1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -150,6 +150,10 @@ public class DeviceTimeIndex implements ITimeIndex {
     return deviceToIndex.keySet();
   }
 
+  public Set<String> getDevices() {
+    return deviceToIndex.keySet();
+  }
+
   @Override
   public boolean endTimeEmpty() {
     for (long endTime : endTimes) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
index b4e127bd3b..c725a2f8e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.storagegroup.timeindex;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.PartitionViolationException;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -186,4 +187,12 @@ public interface ITimeIndex {
    * it may or may not contain this device
    */
   boolean mayContainsDevice(String device);
+
+  static ITimeIndex createTimeIndex(InputStream inputStream) throws IOException {
+    byte timeIndexType = ReadWriteIOUtils.readByte(inputStream);
+    if (timeIndexType == -1) {
+      throw new IOException("The end of stream has been reached");
+    }
+    return TimeIndexLevel.valueOf(timeIndexType).getTimeIndex().deserialize(inputStream);
+  }
 }