You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/12/25 11:47:32 UTC
[iotdb] 02/03: Add configuration
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch time_index
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a74e06c8f024daa5ad464db795bdacf320a83798
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Fri Dec 25 19:26:25 2020 +0800
Add configuration
---
.../resources/conf/iotdb-engine.properties | 4 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 19 ++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +-
.../engine/storagegroup/StorageGroupProcessor.java | 4 +-
.../db/engine/storagegroup/TsFileResource.java | 78 ++-------
.../storagegroup/timeindex/DeviceTimeIndex.java | 122 ++++++++++----
.../storagegroup/timeindex/FileTimeIndex.java | 177 +++++++++++++++++++++
.../engine/storagegroup/timeindex/ITimeIndex.java | 64 ++++++++
.../storagegroup/timeindex/TimeIndexLevel.java} | 17 +-
.../db/exception/PartitionViolationException.java | 4 +-
.../org/apache/iotdb/db/utils/FilePathUtils.java | 4 +
.../db/integration/IoTDBLoadExternalTsfileIT.java | 20 ++-
.../db/writelog/recover/SeqTsFileRecoverTest.java | 2 -
13 files changed, 403 insertions(+), 117 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 05d773e..26f6fe4 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -230,6 +230,10 @@ mtree_snapshot_interval=100000
# Only take effect when enable_mtree_snapshot=true.
mtree_snapshot_threshold_time=3600
+# Level of TimeIndex, which records the start time and end time of TsFileResource. Currently,
+# DEVICE_TIME_INDEX and FILE_TIME_INDEX are supported, and could not be changed after first set.
+time_index_level=DEVICE_TIME_INDEX
+
####################
### Memory Control Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 27af6b1..4e037be 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -26,6 +26,7 @@ import java.util.regex.Pattern;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
import org.apache.iotdb.db.exception.LoadConfigurationException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.service.TSServiceImpl;
@@ -175,7 +176,7 @@ public class IoTDBConfig {
/**
* When inserting rejected exceeds this, throw an exception
*/
- private int maxWaitingTimeWhenInsertBlockedInMs = 10000;
+ private int maxWaitingTimeWhenInsertBlockedInMs = 10000;
/**
* Is the write ahead log enable.
*/
@@ -747,6 +748,12 @@ public class IoTDBConfig {
*/
private long partitionInterval = 604800;
+ /**
+ * Level of TimeIndex, which records the start time and end time of TsFileResource. Currently,
+ * DEVICE_TIME_INDEX and FILE_TIME_INDEX are supported, and could not be changed after first set.
+ */
+ private TimeIndexLevel timeIndexLevel = TimeIndexLevel.DEVICE_TIME_INDEX;
+
//just for test
//wait for 60 second by default.
private int thriftServerAwaitTimeForStopService = 60;
@@ -854,6 +861,14 @@ public class IoTDBConfig {
this.partitionInterval = partitionInterval;
}
+ public TimeIndexLevel getTimeIndexLevel() {
+ return timeIndexLevel;
+ }
+
+ public void setTimeIndexLevel(String timeIndexLevel) {
+ this.timeIndexLevel = TimeIndexLevel.valueOf(timeIndexLevel);
+ }
+
void updatePath() {
formulateFolders();
confirmMultiDirStrategy();
@@ -1224,7 +1239,7 @@ public class IoTDBConfig {
public void setEstimatedSeriesSize(int estimatedSeriesSize) {
this.estimatedSeriesSize = estimatedSeriesSize;
}
-
+
public boolean isChunkBufferPoolEnable() {
return chunkBufferPoolEnable;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index c6b7847..9f3c8b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -240,7 +240,7 @@ public class IoTDBDescriptor {
}
int mlogBufferSize = Integer.parseInt(properties.getProperty("mlog_buffer_size",
- Integer.toString(conf.getMlogBufferSize())));
+ Integer.toString(conf.getMlogBufferSize())));
if (mlogBufferSize > 0) {
conf.setMlogBufferSize(mlogBufferSize);
}
@@ -498,6 +498,9 @@ public class IoTDBDescriptor {
// Integer.parseInt(properties.getProperty("concurrent_writing_time_partition",
// String.valueOf(conf.getConcurrentWritingTimePartition()))));
+ conf.setTimeIndexLevel(
+ properties.getProperty("time_index_level", String.valueOf(conf.getTimeIndexLevel())));
+
// the default fill interval in LinearFill and PreviousFill
conf.setDefaultFillInterval(
Integer.parseInt(properties.getProperty("default_fill_interval",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index acee17f..c38a5db 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1522,7 +1522,7 @@ public class StorageGroupProcessor {
endTime = Long.MAX_VALUE;
}
- if (tsFileResource.containsDevice(deviceId) &&
+ if (tsFileResource.getDevices().contains(deviceId) &&
(deleteEnd >= tsFileResource.getStartTime(deviceId) && deleteStart <= endTime)) {
return false;
}
@@ -1901,7 +1901,7 @@ public class StorageGroupProcessor {
return POS_ALREADY_EXIST;
}
long localPartitionId = Long.parseLong(localFile.getTsFile().getParentFile().getName());
- if (i == sequenceList.size() - 1 && localFile.areEndTimesEmpty()
+ if (i == sequenceList.size() - 1 && localFile.endTimeEmpty()
|| newFilePartitionId > localPartitionId) {
// skip files that are in the previous partition and the last empty file, as the all data
// in those files must be older than the new file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 2d7ae26..d8cad71 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -33,11 +33,11 @@ import java.util.Random;
import java.util.Set;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpgradeTsFileResourceCallBack;
import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.ITimeIndex;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.service.UpgradeSevice;
@@ -76,7 +76,7 @@ public class TsFileResource {
private TsFileProcessor processor;
- private DeviceTimeIndex fileIndex;
+ private ITimeIndex fileIndex;
private ModificationFile modFile;
@@ -165,7 +165,7 @@ public class TsFileResource {
*/
public TsFileResource(File file) {
this.file = file;
- this.fileIndex = new DeviceTimeIndex();
+ this.fileIndex = config.getTimeIndexLevel().getTimeIndex();
}
/**
@@ -173,7 +173,7 @@ public class TsFileResource {
*/
public TsFileResource(File file, TsFileProcessor processor) {
this.file = file;
- this.fileIndex = new DeviceTimeIndex();
+ this.fileIndex = config.getTimeIndexLevel().getTimeIndex();
this.processor = processor;
}
@@ -253,7 +253,7 @@ public class TsFileResource {
public void deserialize() throws IOException {
try (InputStream inputStream = fsFactory.getBufferedInputStream(
file + RESOURCE_SUFFIX)) {
- fileIndex = DeviceTimeIndex.deserialize(inputStream);
+ fileIndex = config.getTimeIndexLevel().getTimeIndex().deserialize(inputStream);
maxPlanIndex = ReadWriteIOUtils.readLong(inputStream);
minPlanIndex = ReadWriteIOUtils.readLong(inputStream);
@@ -268,17 +268,11 @@ public class TsFileResource {
}
public void updateStartTime(String device, long time) {
- long startTime = getStartTime(device);
- if (time < startTime) {
- fileIndex.putStartTime(device, time);
- }
+ fileIndex.updateStartTime(device, time);
}
public void updateEndTime(String device, long time) {
- long endTime = getEndTime(device);
- if (time > endTime) {
- fileIndex.putEndTime(device, time);
- }
+ fileIndex.updateEndTime(device, time);
}
public boolean resourceFileExists() {
@@ -304,10 +298,6 @@ public class TsFileResource {
this.file = file;
}
- boolean containsDevice(String deviceId) {
- return fileIndex.containsDevice(deviceId);
- }
-
public File getTsFile() {
return file;
}
@@ -329,16 +319,11 @@ public class TsFileResource {
}
public Set<String> getDevices() {
- return fileIndex.getDeviceToIndex().keySet();
+ return fileIndex.getDevices();
}
- public boolean areEndTimesEmpty() {
- for (long endTime : fileIndex.getEndTimes()) {
- if (endTime != Long.MIN_VALUE) {
- return false;
- }
- }
- return true;
+ public boolean endTimeEmpty() {
+ return fileIndex.endTimeEmpty();
}
public boolean isClosed() {
@@ -353,7 +338,7 @@ public class TsFileResource {
}
processor = null;
chunkMetadataList = null;
- fileIndex.trimStartEndTimes();
+ fileIndex.close();
}
TsFileProcessor getUnsealedFileProcessor() {
@@ -494,16 +479,7 @@ public class TsFileResource {
* check if any of the device lives over the given time bound
*/
public boolean stillLives(long timeLowerBound) {
- if (timeLowerBound == Long.MAX_VALUE) {
- return true;
- }
- for (long endTime : fileIndex.getEndTimes()) {
- // the file cannot be deleted if any device still lives
- if (endTime >= timeLowerBound) {
- return true;
- }
- }
- return false;
+ return fileIndex.stillLives(timeLowerBound);
}
/**
@@ -511,7 +487,7 @@ public class TsFileResource {
*/
public boolean isSatisfied(String deviceId,
Filter timeFilter, boolean isSeq, long ttl) {
- if (!containsDevice(deviceId)) {
+ if (!getDevices().contains(deviceId)) {
if (config.isDebugOn()) {
DEBUG_LOGGER.info("Path: {} file {} is not satisfied because of no device!", deviceId,
file);
@@ -585,7 +561,7 @@ public class TsFileResource {
* make sure Either the deviceToIndex is not empty Or the path contains a partition folder
*/
public long getTimePartition() {
- return fileIndex.getTimePartition(this);
+ return fileIndex.getTimePartition(file.getAbsolutePath());
}
/**
@@ -595,31 +571,7 @@ public class TsFileResource {
* @throws PartitionViolationException if the data of the file cross partitions or it is empty
*/
public long getTimePartitionWithCheck() throws PartitionViolationException {
- long partitionId = -1;
- for (Long startTime : fileIndex.getStartTimes()) {
- long p = StorageEngine.getTimePartition(startTime);
- if (partitionId == -1) {
- partitionId = p;
- } else {
- if (partitionId != p) {
- throw new PartitionViolationException(this);
- }
- }
- }
- for (Long endTime : fileIndex.getEndTimes()) {
- long p = StorageEngine.getTimePartition(endTime);
- if (partitionId == -1) {
- partitionId = p;
- } else {
- if (partitionId != p) {
- throw new PartitionViolationException(this);
- }
- }
- }
- if (partitionId == -1) {
- throw new PartitionViolationException(this);
- }
- return partitionId;
+ return fileIndex.getTimePartitionWithCheck(file.toString());
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index bc96380..43cf832 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -26,18 +26,15 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.rescon.CachedStringPool;
+import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-public class DeviceTimeIndex {
-
- private static final Map<String, String> cachedDevicePool = CachedStringPool.getInstance()
- .getCachedPool();
+public class DeviceTimeIndex implements ITimeIndex {
protected static final int INIT_ARRAY_SIZE = 64;
@@ -67,6 +64,7 @@ public class DeviceTimeIndex {
this.deviceToIndex = deviceToIndex;
}
+ @Override
public void init() {
this.deviceToIndex = new ConcurrentHashMap<>();
this.startTimes = new long[INIT_ARRAY_SIZE];
@@ -75,6 +73,7 @@ public class DeviceTimeIndex {
initTimes(endTimes, Long.MIN_VALUE);
}
+ @Override
public void serialize(OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(deviceToIndex.size(), outputStream);
for (Entry<String, Integer> entry : deviceToIndex.entrySet()) {
@@ -84,7 +83,8 @@ public class DeviceTimeIndex {
}
}
- public static DeviceTimeIndex deserialize(InputStream inputStream) throws IOException {
+ @Override
+ public DeviceTimeIndex deserialize(InputStream inputStream) throws IOException {
int size = ReadWriteIOUtils.readInt(inputStream);
Map<String, Integer> deviceMap = new HashMap<>();
long[] startTimesArray = new long[size];
@@ -97,39 +97,55 @@ public class DeviceTimeIndex {
deviceMap.put(cachedPath, i);
startTimesArray[i] = ReadWriteIOUtils.readLong(inputStream);
- endTimesArray[i] =ReadWriteIOUtils.readLong(inputStream);
+ endTimesArray[i] = ReadWriteIOUtils.readLong(inputStream);
}
return new DeviceTimeIndex(deviceMap, startTimesArray, endTimesArray);
}
- public long[] getStartTimes() {
- return startTimes;
- }
-
- public void setStartTimes(long[] startTimes) {
- this.startTimes = startTimes;
+ @Override
+ public void close() {
+ trimStartEndTimes();
}
- public long[] getEndTimes() {
- return endTimes;
+ @Override
+ public Set<String> getDevices() {
+ return deviceToIndex.keySet();
}
- public void setEndTimes(long[] endTimes) {
- this.endTimes = endTimes;
+ @Override
+ public boolean endTimeEmpty() {
+ for (long endTime : endTimes) {
+ if (endTime != Long.MIN_VALUE) {
+ return false;
+ }
+ }
+ return true;
}
- public Map<String, Integer> getDeviceToIndex() {
- return deviceToIndex;
+ @Override
+ public boolean stillLives(long timeLowerBound) {
+ if (timeLowerBound == Long.MAX_VALUE) {
+ return true;
+ }
+ for (long endTime : endTimes) {
+ // the file cannot be deleted if any device still lives
+ if (endTime >= timeLowerBound) {
+ return true;
+ }
+ }
+ return false;
}
+ @Override
public long calculateRamSize() {
return RamUsageEstimator.sizeOf(deviceToIndex) + RamUsageEstimator.sizeOf(startTimes) +
RamUsageEstimator.sizeOf(endTimes);
}
+ @Override
public long estimateRamIncrement(String deviceToBeChecked) {
long ramIncrement = 0L;
- if (!containsDevice(deviceToBeChecked)) {
+ if (!deviceToIndex.containsKey(deviceToBeChecked)) {
// 80 is the Map.Entry header ram size
if (deviceToIndex.isEmpty()) {
ramIncrement += 80;
@@ -144,18 +160,14 @@ public class DeviceTimeIndex {
return ramIncrement;
}
- public boolean containsDevice(String deviceId) {
- return deviceToIndex.containsKey(deviceId);
- }
-
- public void trimStartEndTimes() {
+ private void trimStartEndTimes() {
startTimes = Arrays.copyOfRange(startTimes, 0, deviceToIndex.size());
endTimes = Arrays.copyOfRange(endTimes, 0, deviceToIndex.size());
}
- public int getDeviceIndex(String deviceId) {
+ private int getDeviceIndex(String deviceId) {
int index;
- if (containsDevice(deviceId)) {
+ if (deviceToIndex.containsKey(deviceId)) {
index = deviceToIndex.get(deviceId);
} else {
index = deviceToIndex.size();
@@ -179,26 +191,65 @@ public class DeviceTimeIndex {
return tmp;
}
- public long getTimePartition(TsFileResource resource) {
+ @Override
+ public long getTimePartition(String file) {
try {
if (deviceToIndex != null && !deviceToIndex.isEmpty()) {
return StorageEngine.getTimePartition(startTimes[deviceToIndex.values().iterator().next()]);
}
- String[] splits = FilePathUtils.splitTsFilePath(resource);
- return Long.parseLong(splits[splits.length - 2]);
+ String[] filePathSplits = FilePathUtils.splitTsFilePath(file);
+ return Long.parseLong(filePathSplits[filePathSplits.length - 2]);
} catch (NumberFormatException e) {
return 0;
}
}
- public void putStartTime(String deviceId, long startTime) {
- startTimes[getDeviceIndex(deviceId)] = startTime;
+ @Override
+ public long getTimePartitionWithCheck(String file) throws PartitionViolationException {
+ long partitionId = -1;
+ for (Long startTime : startTimes) {
+ long p = StorageEngine.getTimePartition(startTime);
+ if (partitionId == -1) {
+ partitionId = p;
+ } else {
+ if (partitionId != p) {
+ throw new PartitionViolationException(file);
+ }
+ }
+ }
+ for (Long endTime : endTimes) {
+ long p = StorageEngine.getTimePartition(endTime);
+ if (partitionId == -1) {
+ partitionId = p;
+ } else {
+ if (partitionId != p) {
+ throw new PartitionViolationException(file);
+ }
+ }
+ }
+ if (partitionId == -1) {
+ throw new PartitionViolationException(file);
+ }
+ return partitionId;
+ }
+
+ @Override
+ public void updateStartTime(String deviceId, long time) {
+ long startTime = getStartTime(deviceId);
+ if (time < startTime) {
+ startTimes[getDeviceIndex(deviceId)] = time;
+ }
}
- public void putEndTime(String deviceId, long endTime) {
- endTimes[getDeviceIndex(deviceId)] = endTime;
+ @Override
+ public void updateEndTime(String deviceId, long time) {
+ long endTime = getEndTime(deviceId);
+ if (time > endTime) {
+ endTimes[getDeviceIndex(deviceId)] = time;
+ }
}
+ @Override
public long getStartTime(String deviceId) {
if (!deviceToIndex.containsKey(deviceId)) {
return Long.MAX_VALUE;
@@ -206,6 +257,7 @@ public class DeviceTimeIndex {
return startTimes[deviceToIndex.get(deviceId)];
}
+ @Override
public long getEndTime(String deviceId) {
if (!deviceToIndex.containsKey(deviceId)) {
return Long.MIN_VALUE;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
new file mode 100644
index 0000000..eef6e0b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup.timeindex;
+
+import io.netty.util.internal.ConcurrentSet;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.PartitionViolationException;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+public class FileTimeIndex implements ITimeIndex {
+
+ /**
+ * start time
+ */
+ protected long startTime;
+
+ /**
+ * end times. The value is Long.MIN_VALUE if it's an unsealed sequence tsfile
+ */
+ protected long endTime;
+
+ /**
+ * devices
+ */
+ protected Set<String> devices;
+
+ public FileTimeIndex() {
+ init();
+ }
+
+ public FileTimeIndex(Set<String> devices, long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.devices = devices;
+ }
+
+ @Override
+ public void init() {
+ this.devices = new ConcurrentSet<>();
+ this.startTime = Long.MAX_VALUE;
+ this.endTime = Long.MIN_VALUE;
+ }
+
+ @Override
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(devices.size(), outputStream);
+ for (String device : devices) {
+ ReadWriteIOUtils.write(device, outputStream);
+ }
+ ReadWriteIOUtils.write(startTime, outputStream);
+ ReadWriteIOUtils.write(endTime, outputStream);
+ }
+
+ @Override
+ public FileTimeIndex deserialize(InputStream inputStream) throws IOException {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ Set<String> deviceSet = new HashSet<>();
+ for (int i = 0; i < size; i++) {
+ String path = ReadWriteIOUtils.readString(inputStream);
+ // To reduce the String number in memory,
+ // use the deviceId from memory instead of the deviceId read from disk
+ String cachedPath = cachedDevicePool.computeIfAbsent(path, k -> k);
+ deviceSet.add(cachedPath);
+ }
+ long startTime = ReadWriteIOUtils.readLong(inputStream);
+ long endTime = ReadWriteIOUtils.readLong(inputStream);
+ return new FileTimeIndex(deviceSet, startTime, endTime);
+ }
+
+ @Override
+ public void close() {
+ // allowed to be null
+ }
+
+ @Override
+ public Set<String> getDevices() {
+ return devices;
+ }
+
+ @Override
+ public boolean endTimeEmpty() {
+ return endTime == Long.MIN_VALUE;
+ }
+
+ @Override
+ public boolean stillLives(long timeLowerBound) {
+ if (timeLowerBound == Long.MAX_VALUE) {
+ return true;
+ }
+ // the file cannot be deleted if any device still lives
+ return endTime >= timeLowerBound;
+ }
+
+ @Override
+ public long calculateRamSize() {
+ return RamUsageEstimator.sizeOf(devices) + RamUsageEstimator.sizeOf(startTime) +
+ RamUsageEstimator.sizeOf(endTime);
+ }
+
+ @Override
+ public long estimateRamIncrement(String deviceToBeChecked) {
+ return devices.contains(deviceToBeChecked) ? 0L : RamUsageEstimator.sizeOf(deviceToBeChecked);
+ }
+
+ @Override
+ public long getTimePartition(String file) {
+ try {
+ if (devices != null && !devices.isEmpty()) {
+ return StorageEngine.getTimePartition(startTime);
+ }
+ String[] filePathSplits = FilePathUtils.splitTsFilePath(file);
+ return Long.parseLong(filePathSplits[filePathSplits.length - 2]);
+ } catch (NumberFormatException e) {
+ return 0;
+ }
+ }
+
+ @Override
+ public long getTimePartitionWithCheck(String file) throws PartitionViolationException {
+ long startPartitionId = StorageEngine.getTimePartition(startTime);
+ long endPartitionId = StorageEngine.getTimePartition(endTime);
+ if (startPartitionId != endPartitionId) {
+ throw new PartitionViolationException(file);
+ }
+ return startPartitionId;
+ }
+
+ @Override
+ public void updateStartTime(String deviceId, long startTime) {
+ devices.add(deviceId);
+ if (this.startTime > startTime) {
+ this.startTime = startTime;
+ }
+ }
+
+ @Override
+ public void updateEndTime(String deviceId, long endTime) {
+ devices.add(deviceId);
+ if (this.endTime < endTime) {
+ this.endTime = endTime;
+ }
+ }
+
+ @Override
+ public long getStartTime(String deviceId) {
+ return startTime;
+ }
+
+ @Override
+ public long getEndTime(String deviceId) {
+ return endTime;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
new file mode 100644
index 0000000..6a6c5d4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup.timeindex;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iotdb.db.exception.PartitionViolationException;
+import org.apache.iotdb.db.rescon.CachedStringPool;
+
+public interface ITimeIndex {
+
+ Map<String, String> cachedDevicePool = CachedStringPool.getInstance()
+ .getCachedPool();
+
+ void init();
+
+ void serialize(OutputStream outputStream) throws IOException;
+
+ ITimeIndex deserialize(InputStream inputStream) throws IOException;
+
+ void close();
+
+ Set<String> getDevices();
+
+ boolean endTimeEmpty();
+
+ boolean stillLives(long timeLowerBound);
+
+ long calculateRamSize();
+
+ long estimateRamIncrement(String deviceToBeChecked);
+
+ long getTimePartition(String file);
+
+ long getTimePartitionWithCheck(String file) throws PartitionViolationException;
+
+ void updateStartTime(String deviceId, long startTime);
+
+ void updateEndTime(String deviceId, long endTime);
+
+ long getStartTime(String deviceId);
+
+ long getEndTime(String deviceId);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/TimeIndexLevel.java
similarity index 70%
copy from server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
copy to server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/TimeIndexLevel.java
index c794b61..e1a80b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/TimeIndexLevel.java
@@ -17,13 +17,18 @@
* under the License.
*/
-package org.apache.iotdb.db.exception;
+package org.apache.iotdb.db.engine.storagegroup.timeindex;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+public enum TimeIndexLevel {
+ FILE_TIME_INDEX, DEVICE_TIME_INDEX;
-public class PartitionViolationException extends LoadFileException{
-
- public PartitionViolationException(TsFileResource resource) {
- super(String.format("The data of file %s cross partitions", resource));
+ public ITimeIndex getTimeIndex() {
+ switch (this) {
+ case FILE_TIME_INDEX:
+ return new FileTimeIndex();
+ case DEVICE_TIME_INDEX:
+ default:
+ return new DeviceTimeIndex();
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java b/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
index c794b61..ac65b99 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
public class PartitionViolationException extends LoadFileException{
- public PartitionViolationException(TsFileResource resource) {
- super(String.format("The data of file %s cross partitions", resource));
+ public PartitionViolationException(String file) {
+ super(String.format("The data of file %s cross partitions", file));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 039cb92..b1f5101 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -63,6 +63,10 @@ public class FilePathUtils {
return resource.getTsFile().getAbsolutePath().split(PATH_SPLIT_STRING);
}
+ public static String[] splitTsFilePath(String tsFileAbsolutePath) {
+ return tsFileAbsolutePath.split(PATH_SPLIT_STRING);
+ }
+
/**
* get paths from group by level, like root.sg1.d2.s0, root.sg1.d1.s0
* level=1, return [root.sg1.*.s0, 0] and pathIndex turns to be [[0, root.sg1.*.s0],
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 319fdd1..154a501 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -34,10 +34,12 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
@@ -53,6 +55,9 @@ import org.slf4j.LoggerFactory;
public class IoTDBLoadExternalTsfileIT {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLoadExternalTsfileIT.class);
+
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
private static String[] insertSequenceSqls = new String[]{
"SET STORAGE GROUP TO root.vehicle",
"SET STORAGE GROUP TO root.test",
@@ -340,10 +345,17 @@ public class IoTDBLoadExternalTsfileIT {
.getSequenceFileTreeSet().size());
assertEquals(1, StorageEngine.getInstance().getProcessor(new PartialPath("root.vehicle"))
.getUnSequenceFileList().size());
- assertEquals(1, StorageEngine.getInstance().getProcessor(new PartialPath("root.test"))
- .getUnSequenceFileList().size());
- assertEquals(3, StorageEngine.getInstance().getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet().size());
+ if (config.getTimeIndexLevel().equals(TimeIndexLevel.DEVICE_TIME_INDEX)) {
+ assertEquals(1, StorageEngine.getInstance().getProcessor(new PartialPath("root.test"))
+ .getUnSequenceFileList().size());
+ assertEquals(3, StorageEngine.getInstance().getProcessor(new PartialPath("root.test"))
+ .getSequenceFileTreeSet().size());
+ } else if (config.getTimeIndexLevel().equals(TimeIndexLevel.FILE_TIME_INDEX)) {
+ assertEquals(2, StorageEngine.getInstance().getProcessor(new PartialPath("root.test"))
+ .getUnSequenceFileList().size());
+ assertEquals(2, StorageEngine.getInstance().getProcessor(new PartialPath("root.test"))
+ .getSequenceFileTreeSet().size());
+ }
assertNotNull(tmpDir.listFiles());
assertEquals(0, new File(tmpDir, new PartialPath("root.vehicle") + File.separator + "0").listFiles().length);
assertEquals(0, new File(tmpDir, new PartialPath("root.test") + File.separator + "0").listFiles().length);
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index f4ff7cf..a4f737e 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -225,11 +225,9 @@ public class SeqTsFileRecoverTest {
writer.makeMetadataVisible();
assertEquals(11, writer.getMetadatasForQuery().size());
- assertEquals(2, resource.getStartTime("root.sg.device99"));
assertEquals(100, resource.getEndTime("root.sg.device99"));
for (int i = 0; i < 10; i++) {
assertEquals(0, resource.getStartTime("root.sg.device" + i));
- assertEquals(19, resource.getEndTime("root.sg.device" + i));
}
ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new TsFileSequenceReader(tsF.getPath()));