You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/12/31 06:52:27 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13] [IOTDB-5266] Fix the issue that the TimeIndex may not be accurate for one device when selecting cross space compaction task (#8688)
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 310d9779f4 [To rel/0.13] [IOTDB-5266] Fix the issue that the TimeIndex may not be accurate for one device when selecting cross space compaction task (#8688)
310d9779f4 is described below
commit 310d9779f4207898496eff81b92bedde068a4e1a
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Sat Dec 31 14:52:20 2022 +0800
[To rel/0.13] [IOTDB-5266] Fix the issue that the TimeIndex may not be accurate for one device when selecting cross space compaction task (#8688)
---
.../cross/rewrite/TsFileDeviceInfoStore.java | 103 +++++++++++++++++++++
.../selector/RewriteCompactionFileSelector.java | 27 ++++--
.../db/engine/storagegroup/TsFileResource.java | 19 ++++
.../storagegroup/timeindex/DeviceTimeIndex.java | 4 +
.../engine/storagegroup/timeindex/ITimeIndex.java | 9 ++
5 files changed, 155 insertions(+), 7 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/TsFileDeviceInfoStore.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/TsFileDeviceInfoStore.java
new file mode 100644
index 0000000000..e9f01a9295
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/TsFileDeviceInfoStore.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.cross.rewrite;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TsFileDeviceInfoStore {
+
+ private Map<TsFileResource, TsFileDeviceInfo> cache;
+
+ public TsFileDeviceInfoStore() {
+ cache = new HashMap<>();
+ }
+
+ public TsFileDeviceInfo get(TsFileResource tsFileResource) {
+ return cache.computeIfAbsent(tsFileResource, TsFileDeviceInfo::new);
+ }
+
+ public static class TsFileDeviceInfo {
+ public TsFileResource resource;
+ private Map<String, DeviceInfo> deviceInfoMap;
+
+ public TsFileDeviceInfo(TsFileResource tsFileResource) {
+ this.resource = tsFileResource;
+ }
+
+ private void prepareDeviceInfos() throws IOException {
+ if (deviceInfoMap != null) {
+ return;
+ }
+ deviceInfoMap = new LinkedHashMap<>();
+ if (TimeIndexLevel.valueOf(resource.getTimeIndexType()) == TimeIndexLevel.FILE_TIME_INDEX) {
+ DeviceTimeIndex timeIndex = resource.buildDeviceTimeIndex();
+ for (String deviceId : timeIndex.getDevices()) {
+ deviceInfoMap.put(
+ deviceId,
+ new DeviceInfo(
+ deviceId, timeIndex.getStartTime(deviceId), timeIndex.getEndTime(deviceId)));
+ }
+ } else {
+ for (String deviceId : resource.getDevices()) {
+ deviceInfoMap.put(
+ deviceId,
+ new DeviceInfo(
+ deviceId, resource.getStartTime(deviceId), resource.getEndTime(deviceId)));
+ }
+ }
+ }
+
+ public List<DeviceInfo> getDevices() throws IOException {
+ prepareDeviceInfos();
+ return new ArrayList<>(deviceInfoMap.values());
+ }
+
+ public DeviceInfo getDeviceInfoById(String deviceId) throws IOException {
+ prepareDeviceInfos();
+ return deviceInfoMap.get(deviceId);
+ }
+
+ public boolean containsDevice(String deviceId) throws IOException {
+ prepareDeviceInfos();
+ return deviceInfoMap.containsKey(deviceId);
+ }
+ }
+
+ public static class DeviceInfo {
+ public String deviceId;
+ public long startTime;
+ public long endTime;
+
+ public DeviceInfo(String deviceId, long startTime, long endTime) {
+ this.deviceId = deviceId;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
index caf5943a4a..e35fe27bdf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
@@ -20,6 +20,9 @@
package org.apache.iotdb.db.engine.compaction.cross.rewrite.selector;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore.DeviceInfo;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore.TsFileDeviceInfo;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.manage.CrossSpaceCompactionResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -75,6 +78,10 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
private AbstractCompactionEstimator compactionEstimator;
+ // Cache the DeviceInfos for used seqFiles to avoid loading DeviceTimeIndex more than 1 times from
+ // disk for each seqFile because each seqFile may be scanned more than 1 times in each selector
+ private final TsFileDeviceInfoStore deviceInfoStore;
+
public RewriteCompactionFileSelector(CrossSpaceCompactionResource resource, long memoryBudget) {
this.resource = resource;
this.memoryBudget = memoryBudget;
@@ -83,6 +90,7 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
this.maxCrossCompactionFileSize =
IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileSize();
this.compactionEstimator = new RewriteCrossCompactionEstimator();
+ this.deviceInfoStore = new TsFileDeviceInfoStore();
}
/**
@@ -274,16 +282,21 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
*
* @param unseqFile the tsFileResource of unseqFile to be compacted
*/
- private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
+ private void selectOverlappedSeqFiles(TsFileResource unseqFile) throws IOException {
final int SELECT_WARN_THRESHOLD = 10;
- for (String deviceId : unseqFile.getDevices()) {
- long unseqStartTime = unseqFile.getStartTime(deviceId);
- long unseqEndTime = unseqFile.getEndTime(deviceId);
+ // It is unnecessary to cache DeviceInfo for unseqFile into store because it is only be used
+ // once in every selector.
+ TsFileDeviceInfo unseqFileDeviceInfo = new TsFileDeviceInfo(unseqFile);
+ for (DeviceInfo deviceInfo : unseqFileDeviceInfo.getDevices()) {
+ String deviceId = deviceInfo.deviceId;
+ long unseqStartTime = deviceInfo.startTime;
+ long unseqEndTime = deviceInfo.endTime;
boolean noMoreOverlap = false;
for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) {
TsFileResource seqFile = resource.getSeqFiles().get(i);
- if (!seqFile.mayContainsDevice(deviceId)) {
+ TsFileDeviceInfo seqFileDeviceInfo = deviceInfoStore.get(seqFile);
+ if (!seqFileDeviceInfo.containsDevice(deviceId)) {
continue;
}
int crossSpaceCompactionTimes = 0;
@@ -295,8 +308,8 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
logger.warn("Meets IOException when selecting files for cross space compaction", e);
}
- long seqEndTime = seqFile.getEndTime(deviceId);
- long seqStartTime = seqFile.getStartTime(deviceId);
+ long seqEndTime = seqFileDeviceInfo.getDeviceInfoById(deviceId).endTime;
+ long seqStartTime = seqFileDeviceInfo.getDeviceInfoById(deviceId).startTime;
if (!seqFile.isClosed()) {
// for unclosed file, only select those that overlap with the unseq file
if (unseqEndTime >= seqStartTime) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 7f6707f98b..4aff97685a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -317,6 +317,25 @@ public class TsFileResource {
}
}
+ public DeviceTimeIndex buildDeviceTimeIndex() throws IOException {
+ readLock();
+ try (InputStream inputStream =
+ FSFactoryProducer.getFSFactory()
+ .getBufferedInputStream(file.getPath() + TsFileResource.RESOURCE_SUFFIX)) {
+ ReadWriteIOUtils.readByte(inputStream);
+ ITimeIndex timeIndexFromResourceFile = ITimeIndex.createTimeIndex(inputStream);
+ if (!(timeIndexFromResourceFile instanceof DeviceTimeIndex)) {
+ throw new IOException("cannot build DeviceTimeIndex from resource " + file.getPath());
+ }
+ return (DeviceTimeIndex) timeIndexFromResourceFile;
+ } catch (Exception e) {
+ throw new IOException(
+ "Can't read file " + file.getPath() + TsFileResource.RESOURCE_SUFFIX + " from disk", e);
+ } finally {
+ readUnlock();
+ }
+ }
+
public void updateStartTime(String device, long time) {
timeIndex.updateStartTime(device, time);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index b843de0e8b..8ffd113ed1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -150,6 +150,10 @@ public class DeviceTimeIndex implements ITimeIndex {
return deviceToIndex.keySet();
}
+ public Set<String> getDevices() {
+ return deviceToIndex.keySet();
+ }
+
@Override
public boolean endTimeEmpty() {
for (long endTime : endTimes) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
index b4e127bd3b..c725a2f8e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.storagegroup.timeindex;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.PartitionViolationException;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.InputStream;
@@ -186,4 +187,12 @@ public interface ITimeIndex {
* it may or may not contain this device
*/
boolean mayContainsDevice(String device);
+
+ static ITimeIndex createTimeIndex(InputStream inputStream) throws IOException {
+ byte timeIndexType = ReadWriteIOUtils.readByte(inputStream);
+ if (timeIndexType == -1) {
+ throw new IOException("The end of stream has been reached");
+ }
+ return TimeIndexLevel.valueOf(timeIndexType).getTimeIndex().deserialize(inputStream);
+ }
}