You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/06/15 01:46:33 UTC
[iotdb] branch master updated: Add java doc (#3401)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e728789 Add java doc (#3401)
e728789 is described below
commit e72878973047758a84e92acfcfa0ddd15b509b70
Author: SilverNarcissus <15...@smail.nju.edu.cn>
AuthorDate: Tue Jun 15 09:46:04 2021 +0800
Add java doc (#3401)
---
.../apache/iotdb/db/engine/cache/ChunkCache.java | 5 ++
.../db/engine/cache/TimeSeriesMetadataCache.java | 27 +++++++
.../apache/iotdb/db/engine/flush/FlushManager.java | 7 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 9 +++
.../iotdb/db/engine/memtable/AbstractMemTable.java | 7 ++
.../apache/iotdb/db/engine/memtable/IMemTable.java | 14 +++-
.../db/engine/querycontext/QueryDataSource.java | 5 ++
.../db/engine/querycontext/ReadOnlyMemChunk.java | 4 +
.../engine/storagegroup/StorageGroupProcessor.java | 92 +++++++++++++++++++++-
.../db/engine/storagegroup/TsFileProcessor.java | 44 ++++++++++-
.../db/engine/storagegroup/TsFileResource.java | 33 ++++++--
.../storagegroup/timeindex/TimeIndexLevel.java | 3 +
.../virtualSg/HashVirtualPartitioner.java | 1 +
.../virtualSg/VirtualStorageGroupManager.java | 2 +
.../version/SimpleFileVersionController.java | 1 +
.../iotdb/db/metadata/VectorPartialPath.java | 6 ++
.../org/apache/iotdb/db/metadata/mnode/MNode.java | 16 ++++
.../iotdb/db/metadata/mnode/MeasurementMNode.java | 19 ++++-
.../iotdb/db/metadata/template/Template.java | 31 ++++++++
.../db/qp/physical/crud/RawDataQueryPlan.java | 18 +++++
.../iotdb/db/query/reader/series/SeriesReader.java | 24 ++++++
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 19 ++++-
.../iotdb/tsfile/file/header/ChunkHeader.java | 3 +
.../tsfile/file/metadata/VectorChunkMetadata.java | 2 +
.../file/metadata/VectorTimeSeriesMetadata.java | 14 ++++
.../read/reader/chunk/VectorChunkReader.java | 20 ++++-
.../tsfile/read/reader/page/ValuePageReader.java | 8 ++
.../tsfile/read/reader/page/VectorChunkReader.java | 21 -----
.../tsfile/read/reader/page/VectorPageReader.java | 7 ++
29 files changed, 425 insertions(+), 37 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
index 556075f..6a126cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
@@ -63,6 +63,11 @@ public class ChunkCache {
lruCache =
new LRULinkedHashMap<ChunkMetadata, Chunk>(MEMORY_THRESHOLD_IN_CHUNK_CACHE) {
+ /**
+ * The calculation is time consuming, so we won't calculate each entry' size each time.
+ * Every 100,000 entry, we will calculate the average size of the first 10 entries, and
+ * use that to represent the next 99,990 entries' size.
+ */
@Override
protected long calEntrySize(ChunkMetadata key, Chunk value) {
long currentSize;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
index 605a341..889ba9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
@@ -81,6 +81,11 @@ public class TimeSeriesMetadataCache {
new LRULinkedHashMap<TimeSeriesMetadataCacheKey, TimeseriesMetadata>(
MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE) {
+ /**
+ * The calculation is time consuming, so we won't calculate each entry' size each time.
+ * Every 100,000 entry, we will calculate the average size of the first 10 entries, and
+ * use that to represent the next 99,990 entries' size.
+ */
@Override
protected long calEntrySize(TimeSeriesMetadataCacheKey key, TimeseriesMetadata value) {
long currentSize;
@@ -226,6 +231,14 @@ public class TimeSeriesMetadataCache {
}
}
+ /**
+ * Support for vector
+ *
+ * @param key vector's own fullPath, e.g. root.sg1.d1.vector
+ * @param subSensorList all subSensors of this vector in one query, e.g. [s1, s2, s3]
+ * @param allSensors all sensors of the device in one device, to vector, this should contain both
+ * vector name and subSensors' name, e.g. [vector, s1, s2, s3]
+ */
// Suppress synchronize warning
// Suppress high Cognitive Complexity warning
@SuppressWarnings({"squid:S1860", "squid:S3776"})
@@ -290,6 +303,10 @@ public class TimeSeriesMetadataCache {
try {
timeSeriesMetadataList.forEach(
metadata -> {
+ // for root.sg1.d1.vector1.s1, key.device of vector will only return root.sg1.d1
+ // metadata.getMeasurementId() will return s1, the vector1 is saved in
+ // key.measurement
+ // so we should concat them to get the deviceId for root.sg1.d1.vector1.s1
TimeSeriesMetadataCacheKey k =
new TimeSeriesMetadataCacheKey(
key.filePath,
@@ -327,6 +344,16 @@ public class TimeSeriesMetadataCache {
}
}
+ /**
+ * !!!Attention!!!
+ *
+ * <p>For a vector, e.g. root.sg1.d1.vector1(s1, s2) TimeSeriesMetadataCacheKey for vector1 should
+ * be {filePath: ""./data/data/seq/......., device: root.sg1.d1.vector1, measurement: vector1},
+ * vector1 will be in both device and measurement TimeSeriesMetadataCacheKey for vector1.s1 should
+ * be {filePath: ""./data/data/seq/......., device: root.sg1.d1.vector1, measurement: s1}
+ * TimeSeriesMetadataCacheKey for vector1.s2 should be {filePath: ""./data/data/seq/.......,
+ * device: root.sg1.d1.vector1, measurement: s2}
+ */
private void getVectorTimeSeriesMetadataListFromCache(
TimeSeriesMetadataCacheKey key, List<String> subSensorList, List<TimeseriesMetadata> res) {
lock.readLock().lock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index b663999..32741bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -92,6 +92,7 @@ public class FlushManager implements FlushManagerMBean, IService {
return FlushSubTaskPoolManager.getInstance().getWaitingTasksNumber();
}
+ /** a flush thread handles flush task */
class FlushThread extends WrappedRunnable {
@Override
@@ -121,7 +122,11 @@ public class FlushManager implements FlushManagerMBean, IService {
}
}
- /** Add TsFileProcessor to asyncTryToFlush manager */
+ /**
+ * Add tsFileProcessor to asyncTryToFlush manager
+ *
+ * @param tsFileProcessor tsFileProcessor to be flushed
+ */
@SuppressWarnings("squid:S2445")
public void registerTsFileProcessor(TsFileProcessor tsFileProcessor) {
synchronized (tsFileProcessor) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 1d76495..7962da4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -46,6 +46,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+/**
+ * flush task to flush one memtable using a pipeline model to flush, which is sort memtable ->
+ * encoding -> write to disk (io task)
+ */
public class MemTableFlushTask {
private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
@@ -115,6 +119,9 @@ public class MemTableFlushTask {
long startTime = System.currentTimeMillis();
IWritableMemChunk series = iWritableMemChunkEntry.getValue();
IMeasurementSchema desc = series.getSchema();
+ /*
+ * sort task (first task of flush pipeline)
+ */
TVList tvList = series.getSortedTvListForFlush();
sortTime += System.currentTimeMillis() - startTime;
encodingTaskQueue.put(new Pair<>(tvList, desc));
@@ -158,6 +165,7 @@ public class MemTableFlushTask {
System.currentTimeMillis() - start);
}
+ /** encoding task (second task of pipeline) */
private Runnable encodingTask =
new Runnable() {
private void writeOneSeries(
@@ -343,6 +351,7 @@ public class MemTableFlushTask {
}
};
+ /** io task (third task of pipeline) */
@SuppressWarnings("squid:S135")
private Runnable ioTask =
() -> {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index d62f8fa..170f49c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -95,6 +95,13 @@ public abstract class AbstractMemTable implements IMemTable {
return memTableMap.containsKey(deviceId) && memTableMap.get(deviceId).containsKey(measurement);
}
+ /**
+ * create this memtable if it's not exist
+ *
+ * @param deviceId device id
+ * @param schema measurement schema
+ * @return this memtable
+ */
private IWritableMemChunk createIfNotExistAndGet(String deviceId, IMeasurementSchema schema) {
Map<String, IWritableMemChunk> memSeries =
memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 5f74b01..bb28c57 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -71,9 +71,20 @@ public interface IMemTable {
long getTotalPointsNum();
+ /**
+ * insert into this memtable
+ *
+ * @param insertRowPlan insertRowPlan
+ */
void insert(InsertRowPlan insertRowPlan);
- /** [start, end) */
+ /**
+ * insert tablet into this memtable
+ *
+ * @param insertTabletPlan insertTabletPlan
+ * @param start included
+ * @param end excluded
+ */
void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end)
throws WriteProcessException;
@@ -114,6 +125,7 @@ public interface IMemTable {
boolean shouldFlush();
+ /** release resource of this memtable */
void release();
/** must guarantee the device exists in the work memtable only used when mem control enabled */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
index 60a6de5..dea19eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
@@ -26,7 +26,12 @@ import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import java.util.List;
+/**
+ * The QueryDataSource contains all the seq and unseq TsFileResources for one timeseries in one
+ * query
+ */
public class QueryDataSource {
+
private List<TsFileResource> seqResources;
private List<TsFileResource> unseqResources;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index d3b5029..e7e83db 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -42,6 +42,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+/**
+ * ReadOnlyMemChunk is a snapshot of the working MemTable and flushing memtable in the memory used
+ * for querying
+ */
public class ReadOnlyMemChunk {
// deletion list for this chunk
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 5ee4adf..2b2185d 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -178,12 +178,14 @@ public class StorageGroupProcessor {
// upgrading sequence TsFile resource list
private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
+ /** sequence tsfile processors which are closing */
private CopyOnReadLinkedList<TsFileProcessor> closingSequenceTsFileProcessor =
new CopyOnReadLinkedList<>();
// upgrading unsequence TsFile resource list
private List<TsFileResource> upgradeUnseqFileList = new LinkedList<>();
+ /** unsequence tsfile processors which are closing */
private CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor =
new CopyOnReadLinkedList<>();
@@ -215,10 +217,16 @@ public class StorageGroupProcessor {
*/
private Map<String, Long> globalLatestFlushedTimeForEachDevice = new HashMap<>();
+ /** virtual storage group id */
private String virtualStorageGroupId;
+
+ /** logical storage group name */
private String logicalStorageGroupName;
+
+ /** storage group system directory */
private File storageGroupSysDir;
- // manage seqFileList and unSeqFileList
+
+ /** manage seqFileList and unSeqFileList */
private TsFileManagement tsFileManagement;
/**
* time partition id -> version controller which assigns a version for each MemTable and
@@ -232,8 +240,12 @@ public class StorageGroupProcessor {
*/
private long dataTTL = Long.MAX_VALUE;
+ /** file system factory (local or hdfs) */
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+
+ /** file flush policy */
private TsFileFlushPolicy fileFlushPolicy;
+
/**
* The max file versions in each partition. By recording this, if several IoTDB instances have the
* same policy of closing file and their ingestion is identical, then files of the same version in
@@ -242,6 +254,7 @@ public class StorageGroupProcessor {
*/
private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
+ /** storage group info for mem control */
private StorageGroupInfo storageGroupInfo = new StorageGroupInfo(this);
/**
* Record the device number of the last TsFile in each storage group, which is applied to
@@ -250,8 +263,13 @@ public class StorageGroupProcessor {
*/
private int deviceNumInLastClosedTsFile = DeviceTimeIndex.INIT_ARRAY_SIZE;
+ /** whether it's ready from recovery */
private boolean isReady = false;
+
+ /** close file listeners */
private List<CloseFileListener> customCloseFileListeners = Collections.emptyList();
+
+ /** flush listeners */
private List<FlushListener> customFlushListeners = Collections.emptyList();
private static final int WAL_BUFFER_SIZE =
@@ -271,6 +289,10 @@ public class StorageGroupProcessor {
// DEFAULT_POOL_TRIM_INTERVAL_MILLIS
private long timeWhenPoolNotEmpty = Long.MAX_VALUE;
+ /**
+ * record the insertWriteLock in SG is being hold by which method, it will be empty string if on
+ * one holds the insertWriteLock
+ */
private String insertWriteLockHolder = "";
/** get the direct byte buffer from pool, each fetch contains two ByteBuffer */
@@ -414,6 +436,7 @@ public class StorageGroupProcessor {
return ret;
}
+ /** recover from file */
private void recover() throws StorageGroupProcessorException {
logger.info("recover Storage Group {}", logicalStorageGroupName + "-" + virtualStorageGroupId);
@@ -793,6 +816,11 @@ public class StorageGroupProcessor {
}
}
+ /**
+ * insert one row of data
+ *
+ * @param insertRowPlan one row of data
+ */
public void insert(InsertRowPlan insertRowPlan)
throws WriteProcessException, TriggerExecutionException {
// reject insertions that are out of ttl
@@ -1122,6 +1150,11 @@ public class StorageGroupProcessor {
}
}
+ /**
+ * mem control module use this method to flush memtable
+ *
+ * @param tsFileProcessor tsfile processor in which memtable to be flushed
+ */
public void submitAFlushTaskWhenShouldFlush(TsFileProcessor tsFileProcessor) {
writeLock("submitAFlushTaskWhenShouldFlush");
try {
@@ -1272,6 +1305,12 @@ public class StorageGroupProcessor {
return TsFileResource.getNewTsFileName(System.currentTimeMillis(), version, 0, 0);
}
+ /**
+ * close one tsfile processor
+ *
+ * @param sequence whether this tsfile processor is sequence or not
+ * @param tsFileProcessor tsfile processor
+ */
public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
synchronized (closeStorageGroupCondition) {
try {
@@ -1298,7 +1337,12 @@ public class StorageGroupProcessor {
}
}
- /** thread-safety should be ensured by caller */
+ /**
+ * close one tsfile processor, thread-safety should be ensured by caller
+ *
+ * @param sequence whether this tsfile processor is sequence or not
+ * @param tsFileProcessor tsfile processor
+ */
public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
// for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
// for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
@@ -1337,7 +1381,11 @@ public class StorageGroupProcessor {
}
}
- /** delete the storageGroup's own folder in folder data/system/storage_groups */
+ /**
+ * delete the storageGroup's own folder in folder data/system/storage_groups
+ *
+ * @param systemDir system dir
+ */
public void deleteFolder(String systemDir) {
logger.info(
"{} will close all files for deleting data folder {}",
@@ -1356,6 +1404,7 @@ public class StorageGroupProcessor {
}
}
+ /** close all tsfile resource */
public void closeAllResources() {
for (TsFileResource tsFileResource : tsFileManagement.getTsFileList(false)) {
try {
@@ -1373,6 +1422,7 @@ public class StorageGroupProcessor {
}
}
+ /** release wal buffer */
public void releaseWalDirectByteBufferPool() {
synchronized (walByteBufferPool) {
while (!walByteBufferPool.isEmpty()) {
@@ -1382,6 +1432,7 @@ public class StorageGroupProcessor {
}
}
+ /** delete tsfile */
public void syncDeleteDataFiles() {
logger.info(
"{} will close all files for deleting data files",
@@ -1522,6 +1573,7 @@ public class StorageGroupProcessor {
}
}
+ /** close all working tsfile processors */
public void asyncCloseAllWorkingTsFileProcessors() {
writeLock("asyncCloseAllWorkingTsFileProcessors");
try {
@@ -1543,6 +1595,7 @@ public class StorageGroupProcessor {
}
}
+ /** force close all working tsfile processors */
public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
writeLock("forceCloseAllWorkingTsFileProcessors");
try {
@@ -1565,6 +1618,14 @@ public class StorageGroupProcessor {
}
// TODO need a read lock, please consider the concurrency with flush manager threads.
+ /**
+ * build query data source by searching all tsfile which fit in query filter
+ *
+ * @param fullPath data path
+ * @param context query context
+ * @param timeFilter time filter
+ * @return query data source
+ */
public QueryDataSource query(
PartialPath fullPath,
QueryContext context,
@@ -1605,6 +1666,7 @@ public class StorageGroupProcessor {
}
}
+ /** lock the read lock of the insert lock */
public void readLock() {
// apply read lock for SG insert lock to prevent inconsistent with concurrently writing memtable
insertLock.readLock().lock();
@@ -1612,16 +1674,19 @@ public class StorageGroupProcessor {
tsFileManagement.readLock();
}
+ /** unlock the read lock of insert lock */
public void readUnlock() {
tsFileManagement.readUnLock();
insertLock.readLock().unlock();
}
+ /** lock the write lock of the insert lock */
public void writeLock(String holder) {
insertLock.writeLock().lock();
insertWriteLockHolder = holder;
}
+ /** unlock the write lock of the insert lock */
public void writeUnlock() {
insertWriteLockHolder = "";
insertLock.writeLock().unlock();
@@ -2029,6 +2094,7 @@ public class StorageGroupProcessor {
return upgradeFileCount.get();
}
+ /** upgrade all files belongs to this storage group */
public void upgrade() {
for (TsFileResource seqTsFileResource : upgradeSeqFileList) {
seqTsFileResource.setSeq(true);
@@ -2117,6 +2183,11 @@ public class StorageGroupProcessor {
resources.clear();
}
+ /**
+ * merge file under this storage group processor
+ *
+ * @param isFullMerge whether this merge is a full merge or not
+ */
public void merge(boolean isFullMerge) {
writeLock("merge");
try {
@@ -2700,6 +2771,11 @@ public class StorageGroupProcessor {
return true;
}
+ /**
+ * get all working sequence tsfile processors
+ *
+ * @return all working sequence tsfile processors
+ */
public Collection<TsFileProcessor> getWorkSequenceTsFileProcessors() {
return workSequenceTsFileProcessors.values();
}
@@ -2757,6 +2833,11 @@ public class StorageGroupProcessor {
return true;
}
+ /**
+ * get all working unsequence tsfile processors
+ *
+ * @return all working unsequence tsfile processors
+ */
public Collection<TsFileProcessor> getWorkUnsequenceTsFileProcessors() {
return workUnsequenceTsFileProcessors.values();
}
@@ -2901,6 +2982,11 @@ public class StorageGroupProcessor {
return tsFileManagement;
}
+ /**
+ * insert batch of rows belongs to one device
+ *
+ * @param insertRowsOfOneDevicePlan batch of rows belongs to one device
+ */
public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
throws WriteProcessException, TriggerExecutionException {
writeLock("InsertRowsOfOneDevice");
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 99d22e0..28f0f6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -84,28 +84,43 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
@SuppressWarnings("java:S1135") // ignore todos
public class TsFileProcessor {
+ /** logger fot this class */
private static final Logger logger = LoggerFactory.getLogger(TsFileProcessor.class);
+ /** storgae group name of this tsfile */
private final String storageGroupName;
+ /** IoTDB config */
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ /** whether it's enable mem control */
private final boolean enableMemControl = config.isEnableMemControl();
+
+ /** storage group info for mem control */
private StorageGroupInfo storageGroupInfo;
+ /** tsfile processor info for mem control */
private TsFileProcessorInfo tsFileProcessorInfo;
/** sync this object in query() and asyncTryToFlush() */
private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>();
+ /** modification to memtable mapping */
private List<Pair<Modification, IMemTable>> modsToMemtable = new ArrayList<>();
+
+ /** writer for restore tsfile and flushing */
private RestorableTsFileIOWriter writer;
+
+ /** tsfile resource for index this tsfile */
private final TsFileResource tsFileResource;
- // time range index to indicate this processor belongs to which time range
+
+ /** time range index to indicate this processor belongs to which time range */
private long timeRangeId;
/**
* Whether the processor is in the queue of the FlushManager or being flushed by a flush thread.
*/
private volatile boolean managedByFlushManager;
+ /** a lock to mutual exclude query and query */
private final ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
/**
* It is set by the StorageGroupProcessor and checked by flush threads. (If shouldClose == true
@@ -113,20 +128,29 @@ public class TsFileProcessor {
*/
private volatile boolean shouldClose;
+ /** working memtable */
private IMemTable workMemTable;
/** this callback is called before the workMemtable is added into the flushingMemTables. */
private final UpdateEndTimeCallBack updateLatestFlushTimeCallback;
+ /** Wal log node */
private WriteLogNode logNode;
+
+ /** whether it's a sequence file or not */
private final boolean sequence;
+
+ /** total memtable size for mem control */
private long totalMemTableSize;
private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get flushQueryLock write lock";
private static final String FLUSH_QUERY_WRITE_RELEASE =
"{}: {} get flushQueryLock write lock released";
+ /** close file listener */
private List<CloseFileListener> closeFileListeners = new ArrayList<>();
+
+ /** flush file listener */
private List<FlushListener> flushListeners = new ArrayList<>();
@SuppressWarnings("squid:S107")
@@ -591,6 +615,7 @@ public class TsFileProcessor {
logger.info("File {} is closed synchronously", tsFileResource.getTsFile().getAbsolutePath());
}
+ /** async close one tsfile, register and close it by another thread */
void asyncClose() {
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
@@ -1015,6 +1040,7 @@ public class TsFileProcessor {
}
}
+ /** end file and write some meta */
private void endFile() throws IOException, TsFileProcessorException {
logger.info("Start to end file {}", tsFileResource);
long closeStartTime = System.currentTimeMillis();
@@ -1054,6 +1080,11 @@ public class TsFileProcessor {
this.managedByFlushManager = managedByFlushManager;
}
+ /**
+ * get WAL log node
+ *
+ * @return WAL log node
+ */
public WriteLogNode getLogNode() {
if (logNode == null) {
logNode =
@@ -1065,6 +1096,7 @@ public class TsFileProcessor {
return logNode;
}
+ /** close this tsfile */
public void close() throws TsFileProcessorException {
try {
// when closing resource file, its corresponding mod file is also closed.
@@ -1090,6 +1122,7 @@ public class TsFileProcessor {
return storageGroupName;
}
+ /** get modifications from a memtable */
private List<Modification> getModificationsForMemtable(IMemTable memTable) {
List<Modification> modifications = new ArrayList<>();
boolean foundMemtable = false;
@@ -1102,6 +1135,14 @@ public class TsFileProcessor {
return modifications;
}
+ /**
+ * construct a deletion list from a memtable
+ *
+ * @param memTable memtable
+ * @param deviceId device id
+ * @param measurement measurement name
+ * @param timeLowerBound time water mark
+ */
private List<TimeRange> constructDeletionList(
IMemTable memTable, String deviceId, String measurement, long timeLowerBound)
throws MetadataException {
@@ -1234,6 +1275,7 @@ public class TsFileProcessor {
this.timeRangeId = timeRangeId;
}
+ /** release resource of a memtable */
public void putMemTableBackAndClose() throws TsFileProcessorException {
if (workMemTable != null) {
workMemTable.release();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index c84c5c2..9449437 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -32,7 +32,11 @@ import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.file.metadata.*;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -50,9 +54,21 @@ import java.io.OutputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.*;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_INDEX;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_MERGECNT_INDEX;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_SEPARATOR;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_TIME_INDEX;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_UNSEQMERGECNT_INDEX;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_VERSION_INDEX;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
@SuppressWarnings("java:S1135") // ignore todos
@@ -64,7 +80,7 @@ public class TsFileResource {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- // tsfile
+ /** this tsfile */
private File file;
public static final String RESOURCE_SUFFIX = ".resource";
@@ -79,6 +95,7 @@ public class TsFileResource {
private TsFileProcessor processor;
+ /** time index */
protected ITimeIndex timeIndex;
/** time index type, fileTimeIndex = 0, deviceTimeIndex = 1 */
@@ -200,6 +217,10 @@ public class TsFileResource {
this.timeIndexType = 1;
}
+ /**
+ * Because the unclosed tsfile don't have TimeSeriesMetadata and memtables in the memory don't
+ * have chunkMetadata, but query will use these, so we need to generate it for them.
+ */
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
private void generateTimeSeriesMetadata() throws IOException {
TimeseriesMetadata timeTimeSeriesMetadata = new TimeseriesMetadata();
@@ -324,6 +345,7 @@ public class TsFileResource {
fsFactory.moveFile(src, dest);
}
+ /** deserialize from disk */
public void deserialize() throws IOException {
try (InputStream inputStream = fsFactory.getBufferedInputStream(file + RESOURCE_SUFFIX)) {
readVersionNumber(inputStream);
@@ -341,6 +363,7 @@ public class TsFileResource {
}
}
+ /** deserialize tsfile resource from old file */
public void deserializeFromOldFile() throws IOException {
try (InputStream inputStream = fsFactory.getBufferedInputStream(file + RESOURCE_SUFFIX)) {
// deserialize old TsfileResource
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/TimeIndexLevel.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/TimeIndexLevel.java
index 072698f..93650fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/TimeIndexLevel.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/TimeIndexLevel.java
@@ -20,7 +20,10 @@
package org.apache.iotdb.db.engine.storagegroup.timeindex;
public enum TimeIndexLevel {
+ /** file to time index (small memory foot print) */
FILE_TIME_INDEX,
+
+ /** device to time index (large memory foot print) */
DEVICE_TIME_INDEX;
public ITimeIndex getTimeIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
index 7a3fb31..8d9d122 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.utils.TestOnly;
public class HashVirtualPartitioner implements VirtualPartitioner {
+ /** total number of virtual storage groups */
public static int STORAGE_GROUP_NUM =
IoTDBDescriptor.getInstance().getConfig().getVirtualStorageGroupNum();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index b7c86b1..e4faadb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -43,6 +43,7 @@ import java.util.Map;
public class VirtualStorageGroupManager {
+ /** logger of this class */
private static final Logger logger = LoggerFactory.getLogger(VirtualStorageGroupManager.class);
/** virtual storage group partitioner */
@@ -393,6 +394,7 @@ public class VirtualStorageGroupManager {
}
}
+ /** release resource of direct wal buffer */
public void releaseWalDirectByteBufferPool() {
for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
if (storageGroupProcessor != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
index 0a73f16..98646fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
@@ -117,6 +117,7 @@ public class SimpleFileVersionController implements VersionController {
prevVersion = currVersion;
}
+ /** recovery from disk */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private void restore() throws IOException {
File directory = SystemFileFactory.INSTANCE.getFile(directoryPath);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java
index b4cdcc4..65ef534 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java
@@ -24,6 +24,12 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import java.util.List;
import java.util.Objects;
+/**
+ * VectorPartialPath represents a vector's fullPath. It not only contains the full path of vector's
+ * own name, but also has subSensorsPathList which contain all the fullPath of vector's sub sensors.
+ * e.g. VectorPartialPath1(root.sg1.d1.vector1, [root.sg1.d1.vector1.s1, root.sg1.d1.vector1.s2])
+ * VectorPartialPath2(root.sg1.d1.vector2, [root.sg1.d1.vector2.s1, root.sg1.d1.vector2.s2])
+ */
public class VectorPartialPath extends PartialPath {
private List<PartialPath> subSensorsPathList;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
index dcd892f..3296cc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
@@ -237,6 +237,11 @@ public class MNode implements Serializable {
return fullPath;
}
+ /**
+ * get partial path of this node
+ *
+ * @return partial path
+ */
public PartialPath getPartialPath() {
List<String> detachedPath = new ArrayList<>();
MNode temp = this;
@@ -316,6 +321,12 @@ public class MNode implements Serializable {
}
}
+ /**
+ * replace a child of this mnode
+ *
+ * @param measurement measurement name
+ * @param newChildNode new child node
+ */
public void replaceChild(String measurement, MNode newChildNode) {
MNode oldChildNode = this.getChild(measurement);
if (oldChildNode == null) {
@@ -343,6 +354,11 @@ public class MNode implements Serializable {
this.fullPath = fullPath;
}
+ /**
+ * get upper template of this node, remember we get nearest template alone this node to root
+ *
+ * @return upper template
+ */
public Template getUpperTemplate() {
MNode cur = this;
while (cur != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
index fb24c04..e2d309b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
@@ -40,12 +40,16 @@ public class MeasurementMNode extends MNode {
/** measurement's Schema for one timeseries represented by current leaf node */
private IMeasurementSchema schema;
+ /** alias name of this measurement */
private String alias;
- // tag/attribute's start offset in tag file
+
+ /** tag/attribute's start offset in tag file */
private long offset = -1;
+ /** last value cache */
private TimeValuePair cachedLastValuePair = null;
+ /** registered trigger */
private TriggerExecutor triggerExecutor = null;
/** @param alias alias of measurementName */
@@ -77,6 +81,13 @@ public class MeasurementMNode extends MNode {
return cachedLastValuePair;
}
+ /**
+ * update last point cache
+ *
+ * @param timeValuePair last point
+ * @param highPriorityUpdate whether it's a high priority update
+ * @param latestFlushedTime latest flushed time
+ */
public synchronized void updateCachedLast(
TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) {
if (timeValuePair == null || timeValuePair.getValue() == null) {
@@ -180,6 +191,12 @@ public class MeasurementMNode extends MNode {
return node;
}
+ /**
+ * get data type
+ *
+ * @param measurementId if it's a vector schema, we need sensor name of it
+ * @return measurement data type
+ */
public TSDataType getDataType(String measurementId) {
if (schema instanceof MeasurementSchema) {
return schema.getType();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
index a44391d..5a59c52 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
@@ -41,6 +41,11 @@ public class Template {
private Map<String, IMeasurementSchema> schemaMap = new HashMap<>();
+ /**
+ * build a template from a createTemplatePlan
+ *
+ * @param plan createTemplatePlan
+ */
public Template(CreateTemplatePlan plan) {
name = plan.getName();
@@ -104,6 +109,12 @@ public class Template {
this.schemaMap = schemaMap;
}
+ /**
+ * check whether a timeseries path is compatible with this template
+ *
+ * @param path timeseries path
+ * @return whether we can create this new timeseries (whether it's compatible with this template)
+ */
public boolean isCompatible(PartialPath path) {
return !(schemaMap.containsKey(path.getMeasurement())
|| schemaMap.containsKey(path.getDevicePath().getMeasurement()));
@@ -141,6 +152,26 @@ public class Template {
return schemaMap.get(measurementName).getMeasurementId();
}
+ /**
+ * get all path in this template (to support aligned by device query)
+ *
+ * @return a hash map looks like below {vector -> [s1, s2, s3] normal_timeseries -> []}
+ */
+ public HashMap<String, List<String>> getAllPath() {
+ HashMap<String, List<String>> res = new HashMap<>();
+ for (Map.Entry<String, IMeasurementSchema> schemaEntry : schemaMap.entrySet()) {
+ if (schemaEntry.getValue() instanceof VectorMeasurementSchema) {
+ VectorMeasurementSchema vectorMeasurementSchema =
+ (VectorMeasurementSchema) schemaEntry.getValue();
+ res.put(schemaEntry.getKey(), vectorMeasurementSchema.getValueMeasurementIdList());
+ } else {
+ res.put(schemaEntry.getKey(), new ArrayList<>());
+ }
+ }
+
+ return res;
+ }
+
@Override
public boolean equals(Object t) {
if (this == t) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
index 15d9b48..796acc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
@@ -46,7 +46,9 @@ public class RawDataQueryPlan extends QueryPlan {
private IExpression expression = null;
private Map<String, Set<String>> deviceToMeasurements = new HashMap<>();
+ /** used to group all the sub sensors of one vector into VectorPartialPath */
private List<PartialPath> deduplicatedVectorPaths = new ArrayList<>();
+
private List<TSDataType> deduplicatedVectorDataTypes = new ArrayList<>();
public RawDataQueryPlan() {
@@ -93,9 +95,15 @@ public class RawDataQueryPlan extends QueryPlan {
}
}
+ // TODO Maybe we should get VectorPartialPath above from MTree
+ // Currently, the above processing will only produce PartialPath instead of VectorPartialPath
+ // even if the queried time series is vector
+ // So, we need to transform the PartialPath to VectorPartialPath if is is a vector.
if (!isRawQuery()) {
transformPaths(IoTDB.metaManager);
} else {
+ // if it is a RawQueryWithoutValueFilter, we also need to group all the subSensors of one
+ // vector into one VectorPartialPath
transformVectorPaths(physicalGenerator, columnForDisplaySet);
}
}
@@ -178,6 +186,12 @@ public class RawDataQueryPlan extends QueryPlan {
}
}
+ /**
+ * Group all the subSensors of one vector into one VectorPartialPath save the grouped
+ * VectorPartialPath in deduplicatedVectorPaths and deduplicatedVectorDataTypes instead of putting
+ * them directly into deduplicatedPaths and deduplicatedDataTypes, because we don't know whether
+ * the raw query has value filter here.
+ */
public void transformVectorPaths(
PhysicalGenerator physicalGenerator, Set<String> columnForDisplaySet)
throws MetadataException {
@@ -214,6 +228,10 @@ public class RawDataQueryPlan extends QueryPlan {
this.deduplicatedVectorDataTypes = deduplicatedVectorDataTypes;
}
+ /**
+ * RawQueryWithoutValueFilter should call this method to use grouped vector partial path to
+ * replace the previous deduplicatedPaths and deduplicatedDataTypes
+ */
public void transformToVector() {
if (!this.deduplicatedVectorPaths.isEmpty()) {
this.deduplicatedPaths = this.deduplicatedVectorPaths;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index e4e0894..8589b95 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -112,6 +112,30 @@ public class SeriesReader {
protected boolean hasCachedNextOverlappedPage;
protected BatchData cachedBatchData;
+ /**
+ * @param seriesPath For querying vector, the seriesPath should be VectorPartialPath. If the query
+ * is raw query without value filter, all sensors belonging to one vector should be all in
+ * this one VectorPartialPath's subSensorsPathList, VectorPartialPath's own fullPath
+ * represents the name of vector itself. Other queries, each sensor in one vector will have
+ * its own SeriesReader, seriesPath's subSensorsPathList contains only one sensor.
+ * @param allSensors For querying vector, allSensors contains vector name and all subSensors'
+ * names in the seriesPath
+ * <p>e.g. we have two vectors: root.sg1.d1.vector1(s1, s2) and root.sg1.d1.vector2(s1, s2),
+ * If the sql is select * from root, we will construct two SeriesReader, The first one's
+ * seriesPath is VectorPartialPath(root.sg1.d1.vector1, [root.sg1.d1.vector1.s1,
+ * root.sg1.d1.vector1.s2]) The first one's allSensors is [vector1, s1, s2] The second one's
+ * seriesPath is VectorPartialPath(root.sg1.d1.vector2, [root.sg1.d1.vector2.s1,
+ * root.sg1.d1.vector2.s2]) The second one's allSensors is [vector2, s1, s2]
+ * <p>If the sql is not RawQueryWithoutValueFilter, like select count(*) from root group by
+ * ([1, 100), 5ms), we will construct four SeriesReader The first one's seriesPath is
+ * VectorPartialPath(root.sg1.d1.vector1, [root.sg1.d1.vector1.s1]) The first one's allSensors
+ * is [vector1, s1] The second one's seriesPath is VectorPartialPath(root.sg1.d1.vector1,
+ * [root.sg1.d1.vector1.s2]) The second one's allSensors is [vector1, s2] The third one's
+ * seriesPath is VectorPartialPath(root.sg1.d1.vector2, [root.sg1.d1.vector2.s1]) The third
+ * one's allSensors is [vector2, s1] The fourth one's seriesPath is
+ * VectorPartialPath(root.sg1.d1.vector2, [root.sg1.d1.vector2.s2]) The fourth one's
+ * allSensors is [vector2, s2]
+ */
public SeriesReader(
PartialPath seriesPath,
Set<String> allSensors,
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 45fe675..1e8c51f 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -113,6 +113,7 @@ public class FileLoaderUtils {
// common path
ITimeSeriesMetadata timeSeriesMetadata;
+ // If the tsfile is closed, we need to load from tsfile
if (resource.isClosed()) {
if (!resource.getTsFile().exists()) {
return null;
@@ -130,7 +131,7 @@ public class FileLoaderUtils {
timeSeriesMetadata.setChunkMetadataLoader(
new DiskChunkMetadataLoader(resource, seriesPath, context, filter));
}
- } else {
+ } else { // if the tsfile is unclosed, we just get it directly from TsFileResource
timeSeriesMetadata = resource.getTimeSeriesMetadata();
if (timeSeriesMetadata != null) {
timeSeriesMetadata.setChunkMetadataLoader(
@@ -156,6 +157,14 @@ public class FileLoaderUtils {
return timeSeriesMetadata;
}
+ /**
+ * Load VectorTimeSeriesMetadata for Vector
+ *
+ * @param resource corresponding TsFileResource
+ * @param seriesPath instance of VectorPartialPath, vector's full path, e.g. (root.sg1.d1.vector,
+ * [root.sg1.d1.vector.s1, root.sg1.d1.vector.s2])
+ * @param subSensorList subSensorList of the seriesPath
+ */
private static VectorTimeSeriesMetadata loadVectorTimeSeriesMetadata(
TsFileResource resource,
PartialPath seriesPath,
@@ -165,10 +174,14 @@ public class FileLoaderUtils {
Set<String> allSensors)
throws IOException {
VectorTimeSeriesMetadata vectorTimeSeriesMetadata = null;
+ // If the tsfile is closed, we need to load from tsfile
if (resource.isClosed()) {
if (!resource.getTsFile().exists()) {
return null;
}
+ // load all the TimeseriesMetadata of vector, the first one is for time column and the
+ // remaining is for sub sensors
+ // the order of timeSeriesMetadata list is same as subSensorList's order
List<TimeseriesMetadata> timeSeriesMetadata =
TimeSeriesMetadataCache.getInstance()
.get(
@@ -181,6 +194,8 @@ public class FileLoaderUtils {
.collect(Collectors.toList()),
allSensors,
context.isDebug());
+
+ // assemble VectorTimeSeriesMetadata
if (timeSeriesMetadata != null && !timeSeriesMetadata.isEmpty()) {
timeSeriesMetadata
.get(0)
@@ -197,7 +212,7 @@ public class FileLoaderUtils {
timeSeriesMetadata.get(0),
timeSeriesMetadata.subList(1, timeSeriesMetadata.size()));
}
- } else {
+ } else { // if the tsfile is unclosed, we just get it directly from TsFileResource
vectorTimeSeriesMetadata = (VectorTimeSeriesMetadata) resource.getTimeSeriesMetadata();
if (vectorTimeSeriesMetadata != null) {
vectorTimeSeriesMetadata.setChunkMetadataLoader(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 22da502..a79db95 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -38,6 +38,9 @@ public class ChunkHeader {
/**
* 1 means this chunk has more than one page, so each page has its own page statistic 5 means this
* chunk has only one page, and this page has no page statistic
+ *
+ * <p>if the 8th bit of this byte is 1 means this chunk is a time chunk of one vector if the 7th
+ * bit of this byte is 1 means this chunk is a value chunk of one vector
*/
private byte chunkType;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
index 01f617a..6558da0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
@@ -31,7 +31,9 @@ import java.util.List;
public class VectorChunkMetadata implements IChunkMetadata {
+ // ChunkMetadata for time column
private final IChunkMetadata timeChunkMetadata;
+ // ChunkMetadata for all subSensors in the vector
private final List<IChunkMetadata> valueChunkMetadataList;
public VectorChunkMetadata(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
index f82194d..87e88af 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
@@ -27,7 +27,9 @@ import java.util.List;
public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata {
+ // TimeSeriesMetadata for time column
private final TimeseriesMetadata timeseriesMetadata;
+ // TimeSeriesMetadata for all subSensors in the vector
private final List<TimeseriesMetadata> valueTimeseriesMetadataList;
public VectorTimeSeriesMetadata(
@@ -36,6 +38,10 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata {
this.valueTimeseriesMetadataList = valueTimeseriesMetadataList;
}
+ /**
+ * If the vector contains only one sub sensor, just return the sub sensor's Statistics Otherwise,
+ * return the Statistics of the time column
+ */
@Override
public Statistics getStatistics() {
return valueTimeseriesMetadataList.size() == 1
@@ -69,6 +75,14 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata {
}
}
+ /**
+ * If the chunkMetadataLoader is MemChunkMetadataLoader, the VectorChunkMetadata is already
+ * assembled while constructing the in-memory TsFileResource, so we just return the assembled
+ * VectorChunkMetadata list.
+ *
+ * <p>Otherwise, we need to assemble the ChunkMetadata of time column and the ChunkMetadata of all
+ * the subSensors to generate the VectorChunkMetadata
+ */
@Override
public List<IChunkMetadata> loadChunkMetadataList() throws IOException {
if (timeseriesMetadata.getChunkMetadataLoader().isMemChunkMetadataLoader()) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/VectorChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/VectorChunkReader.java
index 37afc31..3cb62c8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/VectorChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/VectorChunkReader.java
@@ -43,9 +43,13 @@ import java.util.List;
public class VectorChunkReader implements IChunkReader {
+ // chunk header of the time column
private final ChunkHeader timeChunkHeader;
+ // chunk headers of all the sub sensors
private final List<ChunkHeader> valueChunkHeaderList = new ArrayList<>();
+ // chunk data of the time column
private final ByteBuffer timeChunkDataBuffer;
+ // chunk data of all the sub sensors
private final List<ByteBuffer> valueChunkDataBufferList = new ArrayList<>();
private final IUnCompressor unCompressor;
private final Decoder timeDecoder =
@@ -83,6 +87,7 @@ public class VectorChunkReader implements IChunkReader {
initAllPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList);
}
+ /** construct all the page readers in this chunk */
private void initAllPageReaders(
Statistics timeChunkStatistics, List<Statistics> valueChunkStatisticsList)
throws IOException {
@@ -91,6 +96,8 @@ public class VectorChunkReader implements IChunkReader {
// deserialize a PageHeader from chunkDataBuffer
PageHeader timePageHeader;
List<PageHeader> valuePageHeaderList = new ArrayList<>();
+ // mask the two highest bit
+ // this chunk has only one page
if ((timeChunkHeader.getChunkType() & 0x3F) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
timePageHeader = PageHeader.deserializeFrom(timeChunkDataBuffer, timeChunkStatistics);
for (int i = 0; i < valueChunkDataBufferList.size(); i++) {
@@ -98,7 +105,7 @@ public class VectorChunkReader implements IChunkReader {
PageHeader.deserializeFrom(
valueChunkDataBufferList.get(i), valueChunkStatisticsList.get(i)));
}
- } else {
+ } else { // this chunk has more than one page
timePageHeader =
PageHeader.deserializeFrom(timeChunkDataBuffer, timeChunkHeader.getDataType());
for (int i = 0; i < valueChunkDataBufferList.size(); i++) {
@@ -145,6 +152,7 @@ public class VectorChunkReader implements IChunkReader {
List<TSDataType> valueDataTypeList = new ArrayList<>();
List<Decoder> valueDecoderList = new ArrayList<>();
for (int i = 0; i < valuePageHeader.size(); i++) {
+ // if the page is satisfied, deserialize it
if (pageSatisfied(valuePageHeader.get(i), valueDeleteIntervalList.get(i))) {
getPageInfo(
valuePageHeader.get(i),
@@ -155,7 +163,7 @@ public class VectorChunkReader implements IChunkReader {
valuePageDataList.add(valuePageInfo.pageData);
valueDataTypeList.add(valuePageInfo.dataType);
valueDecoderList.add(valuePageInfo.decoder);
- } else {
+ } else { // if the page is not satisfied, just skip it
valueChunkDataBufferList
.get(i)
.position(
@@ -181,6 +189,14 @@ public class VectorChunkReader implements IChunkReader {
return vectorPageReader;
}
+ /**
+ * deserialize the page
+ *
+ * @param pageHeader PageHeader for current page
+ * @param chunkBuffer current chunk data buffer
+ * @param chunkHeader current chunk header
+ * @param pageInfo A struct to put the deserialized page into.
+ */
private void getPageInfo(
PageHeader pageHeader, ByteBuffer chunkBuffer, ChunkHeader chunkHeader, PageInfo pageInfo)
throws IOException {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
index dd5eea3..dd0973e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
@@ -79,6 +79,10 @@ public class ValuePageReader {
this.valueBuffer = pageData.slice();
}
+ /**
+ * return a BatchData with the corresponding timeBatch, the BatchData's dataType is same as this
+ * sub sensor
+ */
public BatchData nextBatch(long[] timeBatch, boolean ascending, Filter filter) {
BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending, false);
for (int i = 0; i < timeBatch.length; i++) {
@@ -130,6 +134,10 @@ public class ValuePageReader {
return pageData.flip();
}
+ /**
+ * return the value array of the corresponding time, if this sub sensor don't have a value in a
+ * time, just fill it with null
+ */
public TsPrimitiveType[] nextValueBatch(long[] timeBatch) {
TsPrimitiveType[] valueBatch = new TsPrimitiveType[size];
if (valueBuffer == null) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorChunkReader.java
deleted file mode 100644
index b4665fc..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorChunkReader.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.reader.page;
-
-public class VectorChunkReader {}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java
index eeb095d..09a2ab7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java
@@ -71,9 +71,13 @@ public class VectorPageReader implements IPageReader {
@Override
public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
long[] timeBatch = timePageReader.nexTimeBatch();
+ // if the vector contains only one sub sensor, just return a common BatchData whose DataType is
+ // same as the only one sub sensor.
if (valuePageReaderList.size() == 1) {
return valuePageReaderList.get(0).nextBatch(timeBatch, ascending, filter);
}
+
+ // if the vector contains more than on sub sensor, the BatchData's DataType is Vector
List<TsPrimitiveType[]> valueBatchList = new ArrayList<>(valueCount);
for (ValuePageReader valuePageReader : valuePageReaderList) {
valueBatchList.add(valuePageReader.nextValueBatch(timeBatch));
@@ -81,6 +85,7 @@ public class VectorPageReader implements IPageReader {
BatchData pageData = BatchDataFactory.createBatchData(TSDataType.VECTOR, ascending, false);
boolean isNull;
for (int i = 0; i < timeBatch.length; i++) {
+ // used to record whether the sub sensors are all null in current time
isNull = true;
TsPrimitiveType[] v = new TsPrimitiveType[valueCount];
for (int j = 0; j < v.length; j++) {
@@ -89,6 +94,8 @@ public class VectorPageReader implements IPageReader {
isNull = false;
}
}
+ // if all the sub sensors' value are null in current time
+ // or current row is not satisfied with the filter, just discard it
// TODO fix value filter v[0].getValue()
if (!isNull && (filter == null || filter.satisfy(timeBatch[i], v[0].getValue()))) {
pageData.putVector(timeBatch[i], v);