You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/12/25 11:47:31 UTC
[iotdb] 01/03: Move time arrays to DeviceTimeIndex
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch time_index
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit daf104c8d076bae8e3091c440f243e930fd05920
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Thu Dec 24 20:48:14 2020 +0800
Move time arrays to DeviceTimeIndex
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 6 +-
.../merge/selector/MaxFileMergeFileSelector.java | 10 +-
.../db/engine/merge/task/MergeMultiChunkTask.java | 2 +-
.../engine/storagegroup/StorageGroupProcessor.java | 109 +++-----
.../db/engine/storagegroup/TsFileProcessor.java | 60 ++---
.../db/engine/storagegroup/TsFileResource.java | 291 ++++++---------------
.../storagegroup/timeindex/DeviceTimeIndex.java | 215 +++++++++++++++
.../iotdb/db/query/control/TracingManager.java | 2 +-
.../db/sync/receiver/load/FileLoaderManager.java | 2 +-
.../apache/iotdb/db/tools/IoTDBDataDirViewer.java | 2 +-
.../iotdb/db/tools/TsFileResourcePrinter.java | 9 +-
.../engine/storagegroup/TsFileProcessorTest.java | 4 +-
.../iotdb/db/integration/IoTDBClearCacheIT.java | 2 +-
.../iotdb/db/query/control/TracingManagerTest.java | 5 +-
.../db/sync/receiver/load/FileLoaderTest.java | 8 +-
.../recover/SyncReceiverLogAnalyzerTest.java | 4 +-
.../db/writelog/recover/DeviceStringTest.java | 8 +-
17 files changed, 396 insertions(+), 343 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 781836f..75fb14e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -674,11 +674,11 @@ public class StorageEngine implements IService {
public void loadNewTsFile(TsFileResource newTsFileResource)
throws LoadFileException, StorageEngineException, MetadataException {
- Map<String, Integer> deviceMap = newTsFileResource.getDeviceToIndexMap();
- if (deviceMap == null || deviceMap.isEmpty()) {
+ Set<String> deviceSet = newTsFileResource.getDevices();
+ if (deviceSet == null || deviceSet.isEmpty()) {
throw new StorageEngineException("Can not get the corresponding storage group.");
}
- String device = deviceMap.keySet().iterator().next();
+ String device = deviceSet.iterator().next();
PartialPath devicePath = new PartialPath(device);
PartialPath storageGroupPath = IoTDB.metaManager.getStorageGroupPath(devicePath);
getProcessor(storageGroupPath).loadNewTsFile(newTsFileResource);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java
index 91049d1..165f810 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java
@@ -213,16 +213,14 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
int tmpSelectedNum = 0;
- for (Entry<String, Integer> deviceStartTimeEntry : unseqFile.getDeviceToIndexMap().entrySet()) {
- String deviceId = deviceStartTimeEntry.getKey();
- int deviceIndex = deviceStartTimeEntry.getValue();
- long unseqStartTime = unseqFile.getStartTime(deviceIndex);
- long unseqEndTime = unseqFile.getEndTime(deviceIndex);
+ for (String deviceId : unseqFile.getDevices()) {
+ long unseqStartTime = unseqFile.getStartTime(deviceId);
+ long unseqEndTime = unseqFile.getEndTime(deviceId);
boolean noMoreOverlap = false;
for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) {
TsFileResource seqFile = resource.getSeqFiles().get(i);
- if (seqSelected[i] || !seqFile.getDeviceToIndexMap().containsKey(deviceId)) {
+ if (seqSelected[i] || !seqFile.getDevices().contains(deviceId)) {
continue;
}
long seqEndTime = seqFile.getEndTime(deviceId);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index ae58cf7..7227ea1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -220,7 +220,7 @@ public class MergeMultiChunkTask {
mergeFileWriter.writeVersion(0L);
mergeFileWriter.endChunkGroup();
mergeLogger.logFilePosition(mergeFileWriter.getFile());
- currTsFile.putStartTime(deviceId, currDeviceMinTime);
+ currTsFile.updateStartTime(deviceId, currDeviceMinTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 955deb4..acee17f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -337,10 +337,8 @@ public class StorageGroupProcessor {
for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
long timePartitionId = resource.getTimePartition();
Map<String, Long> endTimeMap = new HashMap<>();
- for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) {
- String deviceId = entry.getKey();
- int index = entry.getValue();
- long endTime = resource.getEndTime(index);
+ for (String deviceId : resource.getDevices()) {
+ long endTime = resource.getEndTime(deviceId);
endTimeMap.put(deviceId, endTime);
}
latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
@@ -376,17 +374,15 @@ public class StorageGroupProcessor {
storageGroupSysDir.getPath());
long currentVersion = versionController.currVersion();
for (TsFileResource resource : upgradeSeqFileList) {
- for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) {
- String deviceId = entry.getKey();
- int index = entry.getValue();
- long endTime = resource.getEndTime(index);
+ for (String deviceId : resource.getDevices()) {
+ long endTime = resource.getEndTime(deviceId);
long endTimePartitionId = StorageEngine.getTimePartition(endTime);
latestTimeForEachDevice.computeIfAbsent(endTimePartitionId, l -> new HashMap<>())
.put(deviceId, endTime);
globalLatestFlushedTimeForEachDevice.put(deviceId, endTime);
// set all the covered partition's LatestFlushedTime to Long.MAX_VALUE
- long partitionId = StorageEngine.getTimePartition(resource.getStartTime(index));
+ long partitionId = StorageEngine.getTimePartition(resource.getStartTime(deviceId));
while (partitionId <= endTimePartitionId) {
partitionLatestFlushedTimeForEachDevice.computeIfAbsent(partitionId, l -> new HashMap<>())
.put(deviceId, Long.MAX_VALUE);
@@ -570,8 +566,6 @@ public class StorageGroupProcessor {
continue;
}
-
-
if (i != tsFiles.size() - 1 || !writer.canWrite()) {
// not the last file or cannot write, just close it
tsFileResource.setClosed(true);
@@ -870,7 +864,8 @@ public class StorageGroupProcessor {
}
}
- private void insertToTsFileProcessor(InsertRowPlan insertRowPlan, boolean sequence, long timePartitionId)
+ private void insertToTsFileProcessor(InsertRowPlan insertRowPlan, boolean sequence,
+ long timePartitionId)
throws WriteProcessException {
TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
@@ -924,7 +919,7 @@ public class StorageGroupProcessor {
public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) {
writeLock();
try {
- if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) &&
+ if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) &&
!closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
}
@@ -1098,7 +1093,7 @@ public class StorageGroupProcessor {
public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
//for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
//for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
- if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
+ if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
return;
}
@@ -1399,7 +1394,7 @@ public class StorageGroupProcessor {
context.setQueryTimeLowerBound(timeLowerBound);
for (TsFileResource tsFileResource : tsFileResources) {
- if (!isTsFileResourceSatisfied(tsFileResource, deviceId.getFullPath(), timeFilter, isSeq)) {
+ if (!tsFileResource.isSatisfied(deviceId.getFullPath(), timeFilter, isSeq, dataTTL)) {
continue;
}
closeQueryLock.readLock().lock();
@@ -1420,7 +1415,7 @@ public class StorageGroupProcessor {
}
// for upgrade files and old files must be closed
for (TsFileResource tsFileResource : upgradeTsFileResources) {
- if (!isTsFileResourceSatisfied(tsFileResource, deviceId.getFullPath(), timeFilter, isSeq)) {
+ if (!tsFileResource.isSatisfied(deviceId.getFullPath(), timeFilter, isSeq, dataTTL)) {
continue;
}
closeQueryLock.readLock().lock();
@@ -1434,43 +1429,6 @@ public class StorageGroupProcessor {
}
/**
- * @return true if the device is contained in the TsFile and it lives beyond TTL
- */
- private boolean isTsFileResourceSatisfied(TsFileResource tsFileResource, String deviceId,
- Filter timeFilter, boolean isSeq) {
- if (!tsFileResource.containsDevice(deviceId)) {
- if (config.isDebugOn()) {
- DEBUG_LOGGER.info("Path: {} file {} is not satisfied because of no device!", deviceId,
- tsFileResource);
- }
- return false;
- }
-
- int deviceIndex = tsFileResource.getDeviceToIndexMap().get(deviceId);
- long startTime = tsFileResource.getStartTime(deviceIndex);
- long endTime = tsFileResource.isClosed() || !isSeq ? tsFileResource.getEndTime(deviceIndex)
- : Long.MAX_VALUE;
-
- if (!isAlive(endTime)) {
- if (config.isDebugOn()) {
- DEBUG_LOGGER
- .info("Path: {} file {} is not satisfied because of ttl!", deviceId, tsFileResource);
- }
- return false;
- }
-
- if (timeFilter != null) {
- boolean res = timeFilter.satisfyStartEndTime(startTime, endTime);
- if (config.isDebugOn() && !res) {
- DEBUG_LOGGER.info("Path: {} file {} is not satisfied because of time filter!", deviceId,
- tsFileResource);
- }
- return res;
- }
- return true;
- }
-
- /**
* Delete data whose timestamp <= 'timestamp' and belongs to the time series
* deviceId.measurementId.
*
@@ -1558,10 +1516,14 @@ public class StorageGroupProcessor {
private boolean canSkipDelete(TsFileResource tsFileResource, Set<PartialPath> devicePaths,
long deleteStart, long deleteEnd) {
for (PartialPath device : devicePaths) {
- if (tsFileResource.containsDevice(device.getFullPath()) &&
- (deleteEnd >= tsFileResource.getStartTime(device.getFullPath()) &&
- deleteStart <= tsFileResource
- .getOrDefaultEndTime(device.getFullPath(), Long.MAX_VALUE))) {
+ String deviceId = device.getFullPath();
+ long endTime = tsFileResource.getEndTime(deviceId);
+ if (endTime == Long.MIN_VALUE) {
+ endTime = Long.MAX_VALUE;
+ }
+
+ if (tsFileResource.containsDevice(deviceId) &&
+ (deleteEnd >= tsFileResource.getStartTime(deviceId) && deleteStart <= endTime)) {
return false;
}
}
@@ -1628,9 +1590,8 @@ public class StorageGroupProcessor {
*/
private void updateEndTimeMap(TsFileProcessor tsFileProcessor) {
TsFileResource resource = tsFileProcessor.getTsFileResource();
- for (Entry<String, Integer> startTime : resource.getDeviceToIndexMap().entrySet()) {
- String deviceId = startTime.getKey();
- resource.forceUpdateEndTime(deviceId,
+ for (String deviceId : resource.getDevices()) {
+ resource.updateEndTime(deviceId,
latestTimeForEachDevice.get(tsFileProcessor.getTimeRangeId()).get(deviceId));
}
}
@@ -1755,9 +1716,9 @@ public class StorageGroupProcessor {
List<TsFileResource> upgradedResources = tsFileResource.getUpgradedResources();
for (TsFileResource resource : upgradedResources) {
long partitionId = resource.getTimePartition();
- resource.getDeviceToIndexMap().forEach((device, index) ->
+ resource.getDevices().forEach(device ->
updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device,
- resource.getEndTime(index))
+ resource.getEndTime(device))
);
}
insertLock.writeLock().lock();
@@ -1976,8 +1937,8 @@ public class StorageGroupProcessor {
*/
private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
boolean hasPre = false, hasSubsequence = false;
- for (String device : fileA.getDeviceToIndexMap().keySet()) {
- if (!fileB.getDeviceToIndexMap().containsKey(device)) {
+ for (String device : fileA.getDevices()) {
+ if (!fileB.getDevices().contains(device)) {
continue;
}
long startTimeA = fileA.getStartTime(device);
@@ -2057,7 +2018,8 @@ public class StorageGroupProcessor {
private void removeFullyOverlapFile(TsFileResource tsFileResource,
Iterator<TsFileResource> iterator
, boolean isSeq) {
- logger.info("Removing a covered file {}, closed: {}", tsFileResource, tsFileResource.isClosed());
+ logger
+ .info("Removing a covered file {}, closed: {}", tsFileResource, tsFileResource.isClosed());
if (!tsFileResource.isClosed()) {
try {
// also remove the TsFileProcessor if the overlapped file is not closed
@@ -2145,10 +2107,8 @@ public class StorageGroupProcessor {
* @UsedBy sync module, load external tsfile module.
*/
private void updateLatestTimeMap(TsFileResource newTsFileResource) {
- for (Entry<String, Integer> entry : newTsFileResource.getDeviceToIndexMap().entrySet()) {
- String device = entry.getKey();
- int index = entry.getValue();
- long endTime = newTsFileResource.getEndTime(index);
+ for (String device : newTsFileResource.getDevices()) {
+ long endTime = newTsFileResource.getEndTime(device);
long timePartitionId = StorageEngine.getTimePartition(endTime);
if (!latestTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>())
.containsKey(device)
@@ -2397,10 +2357,13 @@ public class StorageGroupProcessor {
*/
public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) {
// examine working processor first as they have the largest plan index
- return isFileAlreadyExistInWorking(tsFileResource, partitionNum, getWorkSequenceTsFileProcessors()) ||
- isFileAlreadyExistInWorking(tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors()) ||
- isFileAlreadyExistInClosed(tsFileResource, partitionNum, getSequenceFileTreeSet()) ||
- isFileAlreadyExistInClosed(tsFileResource, partitionNum, getUnSequenceFileList());
+ return
+ isFileAlreadyExistInWorking(tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
+ ||
+ isFileAlreadyExistInWorking(tsFileResource, partitionNum,
+ getWorkUnsequenceTsFileProcessors()) ||
+ isFileAlreadyExistInClosed(tsFileResource, partitionNum, getSequenceFileTreeSet()) ||
+ isFileAlreadyExistInClosed(tsFileResource, partitionNum, getUnSequenceFileList());
}
private boolean isFileAlreadyExistInClosed(TsFileResource tsFileResource, long partitionNum,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 68e8d86..18c16d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -46,9 +46,9 @@ import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpdateEndTimeCallBack;
import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
@@ -144,7 +144,8 @@ public class TsFileProcessor {
}
@SuppressWarnings("java:S107") // ignore number of arguments
- public TsFileProcessor(String storageGroupName, StorageGroupInfo storageGroupInfo, TsFileResource tsFileResource,
+ public TsFileProcessor(String storageGroupName, StorageGroupInfo storageGroupInfo,
+ TsFileResource tsFileResource,
VersionController versionController, CloseFileListener closeUnsealedTsFileProcessor,
UpdateEndTimeCallBack updateLatestFlushTimeCallback, boolean sequence,
RestorableTsFileIOWriter writer) {
@@ -204,9 +205,9 @@ public class TsFileProcessor {
* the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
*/
public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end,
TSStatus[] results) throws WriteProcessException {
@@ -255,7 +256,7 @@ public class TsFileProcessor {
tsFileResource.updatePlanIndexes(insertTabletPlan.getIndex());
}
- private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
+ private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
throws WriteProcessException {
// memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
long memTableIncrement = 0L;
@@ -274,8 +275,7 @@ public class TsFileProcessor {
chunkMetadataIncrement += ChunkMetadata.calculateRamSize(insertRowPlan.getMeasurements()[i],
insertRowPlan.getDataTypes()[i]);
memTableIncrement += TVList.tvListArrayMemSize(insertRowPlan.getDataTypes()[i]);
- }
- else {
+ } else {
// here currentChunkPointNum >= 1
int currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId,
insertRowPlan.getMeasurements()[i]);
@@ -287,7 +287,7 @@ public class TsFileProcessor {
textDataIncrement += MemUtils.getBinarySize((Binary) insertRowPlan.getValues()[i]);
}
}
- updateMemoryInfo(memTableIncrement, unsealedResourceIncrement,
+ updateMemoryInfo(memTableIncrement, unsealedResourceIncrement,
chunkMetadataIncrement, textDataIncrement);
}
@@ -314,12 +314,12 @@ public class TsFileProcessor {
long memTableIncrement = memIncrements[0];
long textDataIncrement = memIncrements[1];
long chunkMetadataIncrement = memIncrements[2];
- updateMemoryInfo(memTableIncrement, unsealedResourceIncrement,
+ updateMemoryInfo(memTableIncrement, unsealedResourceIncrement,
chunkMetadataIncrement, textDataIncrement);
}
private void updateMemCost(TSDataType dataType, String measurement, String deviceId, int start,
- int end, long[] memIncrements, Object column) {
+ int end, long[] memIncrements, Object column) {
// memIncrements = [memTable, text, chunk metadata] respectively
if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurement)) {
@@ -327,15 +327,13 @@ public class TsFileProcessor {
memIncrements[2] += ChunkMetadata.calculateRamSize(measurement, dataType);
memIncrements[0] += ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
* TVList.tvListArrayMemSize(dataType);
- }
- else {
+ } else {
int currentChunkPointNum = workMemTable
.getCurrentChunkPointNum(deviceId, measurement);
if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
memIncrements[0] += ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
* TVList.tvListArrayMemSize(dataType);
- }
- else {
+ } else {
int acquireArray =
(end - start - 1 + (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE))
/ PrimitiveArrayManager.ARRAY_SIZE;
@@ -494,7 +492,8 @@ public class TsFileProcessor {
+ "size: {}, plan index: [{}, {}]",
storageGroupName, tsFileResource.getTsFile().getAbsolutePath(),
workMemTable.memSize(),
- tsFileResource.getTsFileSize(), workMemTable.getMinPlanIndex(), workMemTable.getMaxPlanIndex());
+ tsFileResource.getTsFileSize(), workMemTable.getMinPlanIndex(),
+ workMemTable.getMaxPlanIndex());
} else {
logger.info("{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}",
storageGroupName, tsFileResource.getTsFile().getAbsolutePath(),
@@ -675,8 +674,8 @@ public class TsFileProcessor {
storageGroupInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
if (logger.isDebugEnabled()) {
logger.debug("[mem control] {}: {} flush finished, try to reset system memcost, "
- + "flushing memtable list size: {}", storageGroupName,
- tsFileResource.getTsFile().getName(), flushingMemTables.size());
+ + "flushing memtable list size: {}", storageGroupName,
+ tsFileResource.getTsFile().getName(), flushingMemTables.size());
}
// report to System
SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, true);
@@ -709,7 +708,8 @@ public class TsFileProcessor {
if (!memTableToFlush.isSignalMemTable()) {
try {
writer.mark();
- MemTableFlushTask flushTask = new MemTableFlushTask(memTableToFlush, writer, storageGroupName);
+ MemTableFlushTask flushTask = new MemTableFlushTask(memTableToFlush, writer,
+ storageGroupName);
flushTask.syncFlushMemTable();
} catch (Exception e) {
if (writer == null) {
@@ -719,8 +719,9 @@ public class TsFileProcessor {
flushingMemTables.notifyAll();
}
} else {
- logger.error("{}: {} meet error when flushing a memtable, change system mode to read-only",
- storageGroupName, tsFileResource.getTsFile().getName(), e);
+ logger
+ .error("{}: {} meet error when flushing a memtable, change system mode to read-only",
+ storageGroupName, tsFileResource.getTsFile().getName(), e);
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
try {
logger.error("{}: {} IOTask meets error, truncate the corrupted data", storageGroupName,
@@ -886,18 +887,19 @@ public class TsFileProcessor {
* memtables and then compact them into one TimeValuePairSorter). Then get the related
* ChunkMetadata of data on disk.
*
- * @param deviceId device id
+ * @param deviceId device id
* @param measurementId measurements id
- * @param dataType data type
- * @param encoding encoding
+ * @param dataType data type
+ * @param encoding encoding
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void query(String deviceId, String measurementId, TSDataType dataType, TSEncoding encoding,
Map<String, String> props, QueryContext context,
List<TsFileResource> tsfileResourcesForQuery) throws IOException, MetadataException {
if (logger.isDebugEnabled()) {
- logger.debug("{}: {} get flushQueryLock and hotCompactionMergeLock read lock", storageGroupName,
- tsFileResource.getTsFile().getName());
+ logger
+ .debug("{}: {} get flushQueryLock and hotCompactionMergeLock read lock", storageGroupName,
+ tsFileResource.getTsFile().getName());
}
flushQueryLock.readLock().lock();
try {
@@ -932,10 +934,8 @@ public class TsFileProcessor {
// get in memory data
if (!readOnlyMemChunks.isEmpty() || !chunkMetadataList.isEmpty()) {
- tsfileResourcesForQuery.add(new TsFileResource(tsFileResource.getTsFile(),
- tsFileResource.getDeviceToIndexMap(),
- tsFileResource.getStartTimes(), tsFileResource.getEndTimes(), readOnlyMemChunks,
- chunkMetadataList, tsFileResource));
+ tsfileResourcesForQuery
+ .add(new TsFileResource(readOnlyMemChunks, chunkMetadataList, tsFileResource));
}
} catch (QueryProcessException e) {
logger.error("{}: {} get ReadOnlyMemChunk has error", storageGroupName,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index b3fd09f..2d7ae26 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -26,23 +26,22 @@ import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Objects;
import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpgradeTsFileResourceCallBack;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
import org.apache.iotdb.db.exception.PartitionViolationException;
-import org.apache.iotdb.db.rescon.CachedStringPool;
import org.apache.iotdb.db.service.UpgradeSevice;
-import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -51,7 +50,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
-import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,31 +59,16 @@ import org.slf4j.LoggerFactory;
public class TsFileResource {
private static final Logger logger = LoggerFactory.getLogger(TsFileResource.class);
- private static Map<String, String> cachedDevicePool = CachedStringPool.getInstance()
- .getCachedPool();
+
+ private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
+
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
// tsfile
private File file;
public static final String RESOURCE_SUFFIX = ".resource";
static final String TEMP_SUFFIX = ".temp";
- protected static final int INIT_ARRAY_SIZE = 64;
-
- /**
- * start times array.
- */
- protected long[] startTimes;
-
- /**
- * end times array. The values in this array are Long.MIN_VALUE if it's an unsealed sequence
- * tsfile
- */
- protected long[] endTimes;
-
- /**
- * device -> index of start times array and end times array
- */
- protected Map<String, Integer> deviceToIndex;
public TsFileProcessor getProcessor() {
return processor;
@@ -92,6 +76,8 @@ public class TsFileResource {
private TsFileProcessor processor;
+ private DeviceTimeIndex fileIndex;
+
private ModificationFile modFile;
private volatile boolean closed = false;
@@ -159,10 +145,8 @@ public class TsFileResource {
public TsFileResource(TsFileResource other) throws IOException {
this.file = other.file;
- this.deviceToIndex = other.deviceToIndex;
- this.startTimes = other.startTimes;
- this.endTimes = other.endTimes;
this.processor = other.processor;
+ this.fileIndex = other.fileIndex;
this.modFile = other.modFile;
this.closed = other.closed;
this.deleted = other.deleted;
@@ -181,11 +165,7 @@ public class TsFileResource {
*/
public TsFileResource(File file) {
this.file = file;
- this.deviceToIndex = new ConcurrentHashMap<>();
- this.startTimes = new long[INIT_ARRAY_SIZE];
- this.endTimes = new long[INIT_ARRAY_SIZE];
- initTimes(startTimes, Long.MAX_VALUE);
- initTimes(endTimes, Long.MIN_VALUE);
+ this.fileIndex = new DeviceTimeIndex();
}
/**
@@ -193,31 +173,31 @@ public class TsFileResource {
*/
public TsFileResource(File file, TsFileProcessor processor) {
this.file = file;
- this.deviceToIndex = new ConcurrentHashMap<>();
- this.startTimes = new long[INIT_ARRAY_SIZE];
- this.endTimes = new long[INIT_ARRAY_SIZE];
- initTimes(startTimes, Long.MAX_VALUE);
- initTimes(endTimes, Long.MIN_VALUE);
+ this.fileIndex = new DeviceTimeIndex();
this.processor = processor;
}
/**
* unsealed TsFile
*/
- public TsFileResource(File file, Map<String, Integer> deviceToIndex, long[] startTimes,
- long[] endTimes, List<ReadOnlyMemChunk> readOnlyMemChunk,
+ public TsFileResource(List<ReadOnlyMemChunk> readOnlyMemChunk,
List<ChunkMetadata> chunkMetadataList, TsFileResource originTsFileResource)
throws IOException {
- this.file = file;
- this.deviceToIndex = deviceToIndex;
- this.startTimes = startTimes;
- this.endTimes = endTimes;
+ this.file = originTsFileResource.file;
+ this.fileIndex = originTsFileResource.fileIndex;
this.chunkMetadataList = chunkMetadataList;
this.readOnlyMemChunk = readOnlyMemChunk;
this.originTsFileResource = originTsFileResource;
generateTimeSeriesMetadata();
}
+ @TestOnly
+ public TsFileResource(File file, Map<String, Integer> deviceToIndex, long[] startTimes,
+ long[] endTimes) {
+ this.file = file;
+ this.fileIndex = new DeviceTimeIndex(deviceToIndex, startTimes, endTimes);
+ }
+
private void generateTimeSeriesMetadata() throws IOException {
timeSeriesMetadata = new TimeseriesMetadata();
timeSeriesMetadata.setOffsetOfChunkMetaDataList(-1);
@@ -251,23 +231,10 @@ public class TsFileResource {
}
}
- protected void initTimes(long[] times, long defaultTime) {
- Arrays.fill(times, defaultTime);
- }
-
public synchronized void serialize() throws IOException {
try (OutputStream outputStream = fsFactory.getBufferedOutputStream(
file + RESOURCE_SUFFIX + TEMP_SUFFIX)) {
- ReadWriteIOUtils.write(this.deviceToIndex.size(), outputStream);
- for (Entry<String, Integer> entry : this.deviceToIndex.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), outputStream);
- ReadWriteIOUtils.write(startTimes[entry.getValue()], outputStream);
- }
- ReadWriteIOUtils.write(this.deviceToIndex.size(), outputStream);
- for (Entry<String, Integer> entry : this.deviceToIndex.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), outputStream);
- ReadWriteIOUtils.write(endTimes[entry.getValue()], outputStream);
- }
+ fileIndex.serialize(outputStream);
ReadWriteIOUtils.write(maxPlanIndex, outputStream);
ReadWriteIOUtils.write(minPlanIndex, outputStream);
@@ -286,29 +253,7 @@ public class TsFileResource {
public void deserialize() throws IOException {
try (InputStream inputStream = fsFactory.getBufferedInputStream(
file + RESOURCE_SUFFIX)) {
- int size = ReadWriteIOUtils.readInt(inputStream);
- Map<String, Integer> deviceMap = new HashMap<>();
- long[] startTimesArray = new long[size];
- long[] endTimesArray = new long[size];
- for (int i = 0; i < size; i++) {
- String path = ReadWriteIOUtils.readString(inputStream);
- long time = ReadWriteIOUtils.readLong(inputStream);
- // To reduce the String number in memory,
- // use the deviceId from memory instead of the deviceId read from disk
- String cachedPath = cachedDevicePool.computeIfAbsent(path, k -> k);
- deviceMap.put(cachedPath, i);
- startTimesArray[i] = time;
- }
- size = ReadWriteIOUtils.readInt(inputStream);
- for (int i = 0; i < size; i++) {
- ReadWriteIOUtils.readString(inputStream); // String path
- long time = ReadWriteIOUtils.readLong(inputStream);
- endTimesArray[i] = time;
- }
- this.startTimes = startTimesArray;
- this.endTimes = endTimesArray;
- this.deviceToIndex = deviceMap;
-
+ fileIndex = DeviceTimeIndex.deserialize(inputStream);
maxPlanIndex = ReadWriteIOUtils.readLong(inputStream);
minPlanIndex = ReadWriteIOUtils.readLong(inputStream);
@@ -325,14 +270,14 @@ public class TsFileResource {
public void updateStartTime(String device, long time) {
long startTime = getStartTime(device);
if (time < startTime) {
- putStartTime(device, time);
+ fileIndex.putStartTime(device, time);
}
}
public void updateEndTime(String device, long time) {
long endTime = getEndTime(device);
if (time > endTime) {
- putEndTime(device, time);
+ fileIndex.putEndTime(device, time);
}
}
@@ -340,10 +285,6 @@ public class TsFileResource {
return fsFactory.getFile(file + RESOURCE_SUFFIX).exists();
}
- void forceUpdateEndTime(String device, long time) {
- putEndTime(device, time);
- }
-
public List<ChunkMetadata> getChunkMetadataList() {
return new ArrayList<>(chunkMetadataList);
}
@@ -364,7 +305,7 @@ public class TsFileResource {
}
boolean containsDevice(String deviceId) {
- return deviceToIndex.containsKey(deviceId);
+ return fileIndex.containsDevice(deviceId);
}
public File getTsFile() {
@@ -380,100 +321,26 @@ public class TsFileResource {
}
public long getStartTime(String deviceId) {
- if (!deviceToIndex.containsKey(deviceId)) {
- return Long.MAX_VALUE;
- }
- return startTimes[deviceToIndex.get(deviceId)];
- }
-
- public long getStartTime(int index) {
- return startTimes[index];
+ return fileIndex.getStartTime(deviceId);
}
public long getEndTime(String deviceId) {
- if (!deviceToIndex.containsKey(deviceId)) {
- return Long.MIN_VALUE;
- }
- return endTimes[deviceToIndex.get(deviceId)];
- }
-
- public long getEndTime(int index) {
- return endTimes[index];
- }
-
- public long getOrDefaultStartTime(String deviceId, long defaultTime) {
- long startTime = getStartTime(deviceId);
- return startTime != Long.MAX_VALUE ? startTime : defaultTime;
- }
-
- public long getOrDefaultEndTime(String deviceId, long defaultTime) {
- long endTime = getEndTime(deviceId);
- return endTime != Long.MIN_VALUE ? endTime : defaultTime;
- }
-
- public void putStartTime(String deviceId, long startTime) {
- int index = getDeviceIndex(deviceId);
- startTimes[index] = startTime;
- }
-
- public void putEndTime(String deviceId, long endTime) {
- int index = getDeviceIndex(deviceId);
- endTimes[index] = endTime;
- }
-
- private int getDeviceIndex(String deviceId) {
- int index;
- if (containsDevice(deviceId)) {
- index = deviceToIndex.get(deviceId);
- } else {
- index = deviceToIndex.size();
- deviceToIndex.put(deviceId, index);
- if (startTimes.length <= index) {
- startTimes = enLargeArray(startTimes, Long.MAX_VALUE);
- endTimes = enLargeArray(endTimes, Long.MIN_VALUE);
- }
- }
- return index;
+ return fileIndex.getEndTime(deviceId);
}
- private long[] enLargeArray(long[] array, long defaultValue) {
- long[] tmp = new long[(int) (array.length * 1.5)];
- initTimes(tmp, defaultValue);
- System.arraycopy(array, 0, tmp, 0, array.length);
- return tmp;
- }
-
- public Map<String, Integer> getDeviceToIndexMap() {
- return deviceToIndex;
- }
-
- public long[] getStartTimes() {
- return startTimes;
- }
-
- public long[] getEndTimes() {
- return endTimes;
- }
-
- public void clearEndTimes() {
- endTimes = new long[endTimes.length];
- initTimes(endTimes, Long.MIN_VALUE);
+ public Set<String> getDevices() {
+ return fileIndex.getDeviceToIndex().keySet();
}
public boolean areEndTimesEmpty() {
- for (long endTime : endTimes) {
- if (endTime != -1) {
+ for (long endTime : fileIndex.getEndTimes()) {
+ if (endTime != Long.MIN_VALUE) {
return false;
}
}
return true;
}
- private void trimStartEndTimes() {
- startTimes = Arrays.copyOfRange(startTimes, 0, deviceToIndex.size());
- endTimes = Arrays.copyOfRange(endTimes, 0, deviceToIndex.size());
- }
-
public boolean isClosed() {
return closed;
}
@@ -486,14 +353,13 @@ public class TsFileResource {
}
processor = null;
chunkMetadataList = null;
- trimStartEndTimes();
+ fileIndex.trimStartEndTimes();
}
TsFileProcessor getUnsealedFileProcessor() {
return processor;
}
-
public void writeLock() {
if (originTsFileResource == null) {
tsFileLock.writeLock();
@@ -575,8 +441,11 @@ public class TsFileResource {
fsFactory.moveFile(file, fsFactory.getFile(targetDir, file.getName()));
fsFactory.moveFile(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX),
fsFactory.getFile(targetDir, file.getName() + RESOURCE_SUFFIX));
- fsFactory.moveFile(fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX),
- fsFactory.getFile(targetDir, file.getName() + ModificationFile.FILE_SUFFIX));
+ File modFile = fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX);
+ if (modFile.exists()) {
+ fsFactory.moveFile(modFile, fsFactory.getFile(targetDir,
+ file.getName() + ModificationFile.FILE_SUFFIX));
+ }
}
@Override
@@ -628,7 +497,7 @@ public class TsFileResource {
if (timeLowerBound == Long.MAX_VALUE) {
return true;
}
- for (long endTime : endTimes) {
+ for (long endTime : fileIndex.getEndTimes()) {
// the file cannot be deleted if any device still lives
if (endTime >= timeLowerBound) {
return true;
@@ -637,12 +506,46 @@ public class TsFileResource {
return false;
}
- protected void setStartTimes(long[] startTimes) {
- this.startTimes = startTimes;
+ /**
+ * @return true if the device is contained in the TsFile and it lives beyond TTL
+ */
+ public boolean isSatisfied(String deviceId,
+ Filter timeFilter, boolean isSeq, long ttl) {
+ if (!containsDevice(deviceId)) {
+ if (config.isDebugOn()) {
+ DEBUG_LOGGER.info("Path: {} file {} is not satisfied because of no device!", deviceId,
+ file);
+ }
+ return false;
+ }
+
+ long startTime = getStartTime(deviceId);
+ long endTime = closed || !isSeq ? getEndTime(deviceId) : Long.MAX_VALUE;
+
+ if (!isAlive(endTime, ttl)) {
+ if (config.isDebugOn()) {
+ DEBUG_LOGGER
+ .info("Path: {} file {} is not satisfied because of ttl!", deviceId, file);
+ }
+ return false;
+ }
+
+ if (timeFilter != null) {
+ boolean res = timeFilter.satisfyStartEndTime(startTime, endTime);
+ if (config.isDebugOn() && !res) {
+ DEBUG_LOGGER.info("Path: {} file {} is not satisfied because of time filter!", deviceId,
+ fsFactory);
+ }
+ return res;
+ }
+ return true;
}
- protected void setEndTimes(long[] endTimes) {
- this.endTimes = endTimes;
+ /**
+ * @return whether the given time falls in ttl
+ */
+ private boolean isAlive(long time, long dataTTL) {
+ return dataTTL == Long.MAX_VALUE || (System.currentTimeMillis() - time) <= dataTTL;
}
public void setProcessor(TsFileProcessor processor) {
@@ -682,15 +585,7 @@ public class TsFileResource {
* make sure Either the deviceToIndex is not empty Or the path contains a partition folder
*/
public long getTimePartition() {
- try {
- if (deviceToIndex != null && !deviceToIndex.isEmpty()) {
- return StorageEngine.getTimePartition(startTimes[deviceToIndex.values().iterator().next()]);
- }
- String[] splits = FilePathUtils.splitTsFilePath(this);
- return Long.parseLong(splits[splits.length - 2]);
- } catch (NumberFormatException e) {
- return 0;
- }
+ return fileIndex.getTimePartition(this);
}
/**
@@ -701,7 +596,7 @@ public class TsFileResource {
*/
public long getTimePartitionWithCheck() throws PartitionViolationException {
long partitionId = -1;
- for (Long startTime : startTimes) {
+ for (Long startTime : fileIndex.getStartTimes()) {
long p = StorageEngine.getTimePartition(startTime);
if (partitionId == -1) {
partitionId = p;
@@ -711,7 +606,7 @@ public class TsFileResource {
}
}
}
- for (Long endTime : endTimes) {
+ for (Long endTime : fileIndex.getEndTimes()) {
long p = StorageEngine.getTimePartition(endTime);
if (partitionId == -1) {
partitionId = p;
@@ -777,8 +672,7 @@ public class TsFileResource {
* @return initial resource map size
*/
public long calculateRamSize() {
- return RamUsageEstimator.sizeOf(deviceToIndex) + RamUsageEstimator.sizeOf(startTimes) +
- RamUsageEstimator.sizeOf(endTimes);
+ return fileIndex.calculateRamSize();
}
/**
@@ -787,20 +681,7 @@ public class TsFileResource {
* @return ramIncrement
*/
public long estimateRamIncrement(String deviceToBeChecked) {
- long ramIncrement = 0L;
- if (!containsDevice(deviceToBeChecked)) {
- // 80 is the Map.Entry header ram size
- if (deviceToIndex.isEmpty()) {
- ramIncrement += 80;
- }
- // Map.Entry ram size
- ramIncrement += RamUsageEstimator.sizeOf(deviceToBeChecked) + 16;
- // if needs to extend the startTimes and endTimes arrays
- if (deviceToIndex.size() >= startTimes.length) {
- ramIncrement += startTimes.length * Long.BYTES;
- }
- }
- return ramIncrement;
+ return fileIndex.estimateRamIncrement(deviceToBeChecked);
}
public void delete() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
new file mode 100644
index 0000000..bc96380
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup.timeindex;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.rescon.CachedStringPool;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+public class DeviceTimeIndex {
+
+ private static final Map<String, String> cachedDevicePool = CachedStringPool.getInstance()
+ .getCachedPool();
+
+ protected static final int INIT_ARRAY_SIZE = 64;
+
+ /**
+ * start times array.
+ */
+ protected long[] startTimes;
+
+ /**
+ * end times array. The values in this array are Long.MIN_VALUE if it's an unsealed sequence
+ * tsfile
+ */
+ protected long[] endTimes;
+
+ /**
+ * device -> index of start times array and end times array
+ */
+ protected Map<String, Integer> deviceToIndex;
+
+ public DeviceTimeIndex() {
+ init();
+ }
+
+ public DeviceTimeIndex(Map<String, Integer> deviceToIndex, long[] startTimes, long[] endTimes) {
+ this.startTimes = startTimes;
+ this.endTimes = endTimes;
+ this.deviceToIndex = deviceToIndex;
+ }
+
+ public void init() {
+ this.deviceToIndex = new ConcurrentHashMap<>();
+ this.startTimes = new long[INIT_ARRAY_SIZE];
+ this.endTimes = new long[INIT_ARRAY_SIZE];
+ initTimes(startTimes, Long.MAX_VALUE);
+ initTimes(endTimes, Long.MIN_VALUE);
+ }
+
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(deviceToIndex.size(), outputStream);
+ for (Entry<String, Integer> entry : deviceToIndex.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(startTimes[entry.getValue()], outputStream);
+ ReadWriteIOUtils.write(endTimes[entry.getValue()], outputStream);
+ }
+ }
+
+ public static DeviceTimeIndex deserialize(InputStream inputStream) throws IOException {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ Map<String, Integer> deviceMap = new HashMap<>();
+ long[] startTimesArray = new long[size];
+ long[] endTimesArray = new long[size];
+ for (int i = 0; i < size; i++) {
+ String path = ReadWriteIOUtils.readString(inputStream);
+ // To reduce the String number in memory,
+ // use the deviceId from memory instead of the deviceId read from disk
+ String cachedPath = cachedDevicePool.computeIfAbsent(path, k -> k);
+ deviceMap.put(cachedPath, i);
+
+ startTimesArray[i] = ReadWriteIOUtils.readLong(inputStream);
+ endTimesArray[i] =ReadWriteIOUtils.readLong(inputStream);
+ }
+ return new DeviceTimeIndex(deviceMap, startTimesArray, endTimesArray);
+ }
+
+ public long[] getStartTimes() {
+ return startTimes;
+ }
+
+ public void setStartTimes(long[] startTimes) {
+ this.startTimes = startTimes;
+ }
+
+ public long[] getEndTimes() {
+ return endTimes;
+ }
+
+ public void setEndTimes(long[] endTimes) {
+ this.endTimes = endTimes;
+ }
+
+ public Map<String, Integer> getDeviceToIndex() {
+ return deviceToIndex;
+ }
+
+ public long calculateRamSize() {
+ return RamUsageEstimator.sizeOf(deviceToIndex) + RamUsageEstimator.sizeOf(startTimes) +
+ RamUsageEstimator.sizeOf(endTimes);
+ }
+
+ public long estimateRamIncrement(String deviceToBeChecked) {
+ long ramIncrement = 0L;
+ if (!containsDevice(deviceToBeChecked)) {
+ // 80 is the Map.Entry header ram size
+ if (deviceToIndex.isEmpty()) {
+ ramIncrement += 80;
+ }
+ // Map.Entry ram size
+ ramIncrement += RamUsageEstimator.sizeOf(deviceToBeChecked) + 16;
+ // if needs to extend the startTimes and endTimes arrays
+ if (deviceToIndex.size() >= startTimes.length) {
+ ramIncrement += startTimes.length * Long.BYTES;
+ }
+ }
+ return ramIncrement;
+ }
+
+ public boolean containsDevice(String deviceId) {
+ return deviceToIndex.containsKey(deviceId);
+ }
+
+ public void trimStartEndTimes() {
+ startTimes = Arrays.copyOfRange(startTimes, 0, deviceToIndex.size());
+ endTimes = Arrays.copyOfRange(endTimes, 0, deviceToIndex.size());
+ }
+
+ public int getDeviceIndex(String deviceId) {
+ int index;
+ if (containsDevice(deviceId)) {
+ index = deviceToIndex.get(deviceId);
+ } else {
+ index = deviceToIndex.size();
+ deviceToIndex.put(deviceId, index);
+ if (startTimes.length <= index) {
+ startTimes = enLargeArray(startTimes, Long.MAX_VALUE);
+ endTimes = enLargeArray(endTimes, Long.MIN_VALUE);
+ }
+ }
+ return index;
+ }
+
+ private void initTimes(long[] times, long defaultTime) {
+ Arrays.fill(times, defaultTime);
+ }
+
+ private long[] enLargeArray(long[] array, long defaultValue) {
+ long[] tmp = new long[(int) (array.length * 1.5)];
+ initTimes(tmp, defaultValue);
+ System.arraycopy(array, 0, tmp, 0, array.length);
+ return tmp;
+ }
+
+ public long getTimePartition(TsFileResource resource) {
+ try {
+ if (deviceToIndex != null && !deviceToIndex.isEmpty()) {
+ return StorageEngine.getTimePartition(startTimes[deviceToIndex.values().iterator().next()]);
+ }
+ String[] splits = FilePathUtils.splitTsFilePath(resource);
+ return Long.parseLong(splits[splits.length - 2]);
+ } catch (NumberFormatException e) {
+ return 0;
+ }
+ }
+
+ public void putStartTime(String deviceId, long startTime) {
+ startTimes[getDeviceIndex(deviceId)] = startTime;
+ }
+
+ public void putEndTime(String deviceId, long endTime) {
+ endTimes[getDeviceIndex(deviceId)] = endTime;
+ }
+
+ public long getStartTime(String deviceId) {
+ if (!deviceToIndex.containsKey(deviceId)) {
+ return Long.MAX_VALUE;
+ }
+ return startTimes[deviceToIndex.get(deviceId)];
+ }
+
+ public long getEndTime(String deviceId) {
+ if (!deviceToIndex.containsKey(deviceId)) {
+ return Long.MIN_VALUE;
+ }
+ return endTimes[deviceToIndex.get(deviceId)];
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
index e6b9811..22a3d0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
@@ -124,7 +124,7 @@ public class TracingManager {
// print startTime and endTime of each device, format e.g.: device1[1, 10000]
private void printTsFileStatistics(StringBuilder builder, TsFileResource tsFileResource) {
- Iterator<String> deviceIter = tsFileResource.getDeviceToIndexMap().keySet().iterator();
+ Iterator<String> deviceIter = tsFileResource.getDevices().iterator();
while (deviceIter.hasNext()) {
String device = deviceIter.next();
builder.append(" ").append(device)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
index 8160745..c28e74c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
@@ -95,7 +95,7 @@ public class FileLoaderManager {
throws SyncDeviceOwnerConflictException, IOException {
String curOwner = tsFileResource.getTsFile().getParentFile().getParentFile().getParentFile()
.getName();
- Set<String> deviceSet = tsFileResource.getDeviceToIndexMap().keySet();
+ Set<String> deviceSet = tsFileResource.getDevices();
checkDeviceConflict(curOwner, deviceSet);
updateDeviceOwner(curOwner, deviceSet);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/IoTDBDataDirViewer.java b/server/src/main/java/org/apache/iotdb/db/tools/IoTDBDataDirViewer.java
index c1f23e1..321c1b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/IoTDBDataDirViewer.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/IoTDBDataDirViewer.java
@@ -138,7 +138,7 @@ public class IoTDBDataDirViewer {
TsFileResource resource = new TsFileResource(SystemFileFactory.INSTANCE.getFile(filename));
resource.deserialize();
// sort device strings
- SortedSet<String> keys = new TreeSet<>(resource.getDeviceToIndexMap().keySet());
+ SortedSet<String> keys = new TreeSet<>(resource.getDevices());
for (String device : keys) {
printlnBoth(pw,
String.format("| | | | |--device %s, start time %d (%s), end time %d (%s)", device,
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileResourcePrinter.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileResourcePrinter.java
index c0fc928..2810f98 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileResourcePrinter.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileResourcePrinter.java
@@ -70,13 +70,10 @@ public class TsFileResourcePrinter {
System.out.printf("Resource plan index range [%d, %d]%n", resource.getMinPlanIndex(),
resource.getMaxPlanIndex());
- for (String device : resource.getDeviceToIndexMap().keySet()) {
+ for (String device : resource.getDevices()) {
System.out.printf(
- "device %s, "
- + "start time %d (%s), "
- + "end time %d (%s)%n",
- device,
- resource.getStartTime(device),
+ "device %s, start time %d (%s), end time %d (%s)%n",
+ device, resource.getStartTime(device),
DatetimeUtils.convertMillsecondToZonedDateTime(resource.getStartTime(device)),
resource.getEndTime(device),
DatetimeUtils.convertMillsecondToZonedDateTime(resource.getEndTime(device)));
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 9ba80bb..362ba9c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -320,8 +320,8 @@ public class TsFileProcessorTest {
throws TsFileProcessorException {
TsFileResource resource = unsealedTsFileProcessor.getTsFileResource();
synchronized (resource) {
- for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) {
- resource.putEndTime(entry.getKey(), resource.getStartTime(entry.getValue()));
+ for (String deviceId : resource.getDevices()) {
+ resource.updateEndTime(deviceId, resource.getStartTime(deviceId));
}
try {
resource.close();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBClearCacheIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBClearCacheIT.java
index 717642f..74a20dc 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBClearCacheIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBClearCacheIT.java
@@ -146,7 +146,7 @@ public class IoTDBClearCacheIT {
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
boolean hasResultSet = statement.execute(
- "select * from root where time>10");
+ "select * from root where time > 10");
assertTrue(hasResultSet);
try (ResultSet resultSet = statement.getResultSet()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/query/control/TracingManagerTest.java b/server/src/test/java/org/apache/iotdb/db/query/control/TracingManagerTest.java
index 25aa325..0e0545a 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/control/TracingManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/control/TracingManagerTest.java
@@ -96,15 +96,14 @@ public class TracingManagerTest {
}
}
- void prepareTsFileResources() throws IOException {
+ void prepareTsFileResources() {
Map<String, Integer> deviceToIndex = new HashMap<>();
deviceToIndex.put("root.sg.d1", 0);
deviceToIndex.put("root.sg.d2", 1);
long[] startTimes = {1, 2};
long[] endTimes = {999, 998};
File file1 = new File(TestConstant.OUTPUT_DATA_DIR.concat("1-1-0.tsfile"));
- TsFileResource tsFileResource1 = new TsFileResource(file1, deviceToIndex, startTimes, endTimes,
- null, null, null);
+ TsFileResource tsFileResource1 = new TsFileResource(file1, deviceToIndex, startTimes, endTimes);
tsFileResource1.setClosed(true);
seqResources.add(tsFileResource1);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
index 5b97f8f..f14b096 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
@@ -127,8 +127,8 @@ public class FileLoaderTest {
LOGGER.error("Can not create new file {}", syncFile.getPath());
}
TsFileResource tsFileResource = new TsFileResource(syncFile);
- tsFileResource.putStartTime(String.valueOf(i), (long) j * 10);
- tsFileResource.putEndTime(String.valueOf(i), (long) j * 10 + 5);
+ tsFileResource.updateStartTime(String.valueOf(i), (long) j * 10);
+ tsFileResource.updateEndTime(String.valueOf(i), (long) j * 10 + 5);
tsFileResource.setMaxPlanIndex(j);
tsFileResource.setMinPlanIndex(j);
@@ -227,8 +227,8 @@ public class FileLoaderTest {
LOGGER.error("Can not create new file {}", syncFile.getPath());
}
TsFileResource tsFileResource = new TsFileResource(syncFile);
- tsFileResource.putStartTime(String.valueOf(i), (long) j * 10);
- tsFileResource.putEndTime(String.valueOf(i), (long) j * 10 + 5);
+ tsFileResource.updateStartTime(String.valueOf(i), (long) j * 10);
+ tsFileResource.updateEndTime(String.valueOf(i), (long) j * 10 + 5);
tsFileResource.setMinPlanIndex(j);
tsFileResource.setMaxPlanIndex(j);
tsFileResource.serialize();
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
index b81f051..1488277 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
@@ -137,8 +137,8 @@ public class SyncReceiverLogAnalyzerTest {
LOGGER.error("Can not create new file {}", syncFile.getPath());
}
TsFileResource tsFileResource = new TsFileResource(syncFile);
- tsFileResource.putStartTime(String.valueOf(i), (long) j * 10);
- tsFileResource.putEndTime(String.valueOf(i), (long) j * 10 + 5);
+ tsFileResource.updateStartTime(String.valueOf(i), (long) j * 10);
+ tsFileResource.updateEndTime(String.valueOf(i), (long) j * 10 + 5);
tsFileResource.serialize();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/DeviceStringTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/DeviceStringTest.java
index 1762502..1d18f25 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/DeviceStringTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/DeviceStringTest.java
@@ -94,8 +94,8 @@ public class DeviceStringTest {
writer.flushAllChunkGroups();
writer.getIOWriter().close();
- resource.putStartTime(new String("root.sg.device99"), 2);
- resource.putEndTime(new String("root.sg.device99"), 100);
+ resource.updateStartTime(new String("root.sg.device99"), 2);
+ resource.updateEndTime(new String("root.sg.device99"), 100);
resource.close();
resource.serialize();
}
@@ -111,8 +111,8 @@ public class DeviceStringTest {
public void testDeviceString() throws IOException, IllegalPathException {
resource = new TsFileResource(tsF);
resource.deserialize();
- assertFalse(resource.getDeviceToIndexMap().keySet().isEmpty());
- for (String device : resource.getDeviceToIndexMap().keySet()) {
+ assertFalse(resource.getDevices().isEmpty());
+ for (String device : resource.getDevices()) {
assertSame(device, mManager.getDeviceId(new PartialPath(device)));
}
}