You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/06/08 06:49:55 UTC
[iotdb] branch add-java-doc updated: add java doc related to write
and flush
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch add-java-doc
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/add-java-doc by this push:
new 4639186 add java doc related to write and flush
4639186 is described below
commit 46391863d532833a019ac83376e7b8db5df7ef29
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Tue Jun 8 14:43:42 2021 +0800
add java doc related to write and flush
---
.../apache/iotdb/db/engine/flush/FlushManager.java | 7 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 9 +++
.../iotdb/db/engine/memtable/AbstractMemTable.java | 7 ++
.../apache/iotdb/db/engine/memtable/IMemTable.java | 14 +++-
.../engine/storagegroup/StorageGroupProcessor.java | 88 +++++++++++++++++++++-
.../db/engine/storagegroup/TsFileProcessor.java | 44 ++++++++++-
.../db/engine/storagegroup/TsFileResource.java | 29 +++++--
.../storagegroup/timeindex/TimeIndexLevel.java | 3 +
.../virtualSg/HashVirtualPartitioner.java | 1 +
.../virtualSg/VirtualStorageGroupManager.java | 2 +
.../version/SimpleFileVersionController.java | 1 +
.../org/apache/iotdb/db/metadata/mnode/MNode.java | 16 ++++
.../iotdb/db/metadata/mnode/MeasurementMNode.java | 19 ++++-
.../iotdb/db/metadata/template/Template.java | 31 ++++++++
14 files changed, 259 insertions(+), 12 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index b663999..32741bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -92,6 +92,7 @@ public class FlushManager implements FlushManagerMBean, IService {
return FlushSubTaskPoolManager.getInstance().getWaitingTasksNumber();
}
+ /** a flush thread handles flush task */
class FlushThread extends WrappedRunnable {
@Override
@@ -121,7 +122,11 @@ public class FlushManager implements FlushManagerMBean, IService {
}
}
- /** Add TsFileProcessor to asyncTryToFlush manager */
+ /**
+ * Add tsFileProcessor to asyncTryToFlush manager
+ *
+ * @param tsFileProcessor tsFileProcessor to be flushed
+ */
@SuppressWarnings("squid:S2445")
public void registerTsFileProcessor(TsFileProcessor tsFileProcessor) {
synchronized (tsFileProcessor) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 1d76495..7962da4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -46,6 +46,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+/**
+ * flush task to flush one memtable using a pipeline model to flush, which is sort memtable ->
+ * encoding -> write to disk (io task)
+ */
public class MemTableFlushTask {
private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
@@ -115,6 +119,9 @@ public class MemTableFlushTask {
long startTime = System.currentTimeMillis();
IWritableMemChunk series = iWritableMemChunkEntry.getValue();
IMeasurementSchema desc = series.getSchema();
+ /*
+ * sort task (first task of flush pipeline)
+ */
TVList tvList = series.getSortedTvListForFlush();
sortTime += System.currentTimeMillis() - startTime;
encodingTaskQueue.put(new Pair<>(tvList, desc));
@@ -158,6 +165,7 @@ public class MemTableFlushTask {
System.currentTimeMillis() - start);
}
+ /** encoding task (second task of pipeline) */
private Runnable encodingTask =
new Runnable() {
private void writeOneSeries(
@@ -343,6 +351,7 @@ public class MemTableFlushTask {
}
};
+ /** io task (third task of pipeline) */
@SuppressWarnings("squid:S135")
private Runnable ioTask =
() -> {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index d62f8fa..170f49c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -95,6 +95,13 @@ public abstract class AbstractMemTable implements IMemTable {
return memTableMap.containsKey(deviceId) && memTableMap.get(deviceId).containsKey(measurement);
}
+ /**
+ * create this memtable if it's not exist
+ *
+ * @param deviceId device id
+ * @param schema measurement schema
+ * @return this memtable
+ */
private IWritableMemChunk createIfNotExistAndGet(String deviceId, IMeasurementSchema schema) {
Map<String, IWritableMemChunk> memSeries =
memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 5f74b01..bb28c57 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -71,9 +71,20 @@ public interface IMemTable {
long getTotalPointsNum();
+ /**
+ * insert into this memtable
+ *
+ * @param insertRowPlan insertRowPlan
+ */
void insert(InsertRowPlan insertRowPlan);
- /** [start, end) */
+ /**
+ * insert tablet into this memtable
+ *
+ * @param insertTabletPlan insertTabletPlan
+ * @param start included
+ * @param end excluded
+ */
void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end)
throws WriteProcessException;
@@ -114,6 +125,7 @@ public interface IMemTable {
boolean shouldFlush();
+ /** release resource of this memtable */
void release();
/** must guarantee the device exists in the work memtable only used when mem control enabled */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a402e48..5a0430d 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -178,12 +178,14 @@ public class StorageGroupProcessor {
// upgrading sequence TsFile resource list
private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
+ /** sequence tsfile processors which are closing */
private CopyOnReadLinkedList<TsFileProcessor> closingSequenceTsFileProcessor =
new CopyOnReadLinkedList<>();
// upgrading unsequence TsFile resource list
private List<TsFileResource> upgradeUnseqFileList = new LinkedList<>();
+ /** unsequence tsfile processors which are closing */
private CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor =
new CopyOnReadLinkedList<>();
@@ -215,10 +217,16 @@ public class StorageGroupProcessor {
*/
private Map<String, Long> globalLatestFlushedTimeForEachDevice = new HashMap<>();
+ /** virtual storage group id */
private String virtualStorageGroupId;
+
+ /** logical storage group name */
private String logicalStorageGroupName;
+
+ /** storage group system directory */
private File storageGroupSysDir;
- // manage seqFileList and unSeqFileList
+
+ /** manage seqFileList and unSeqFileList */
private TsFileManagement tsFileManagement;
/**
* time partition id -> version controller which assigns a version for each MemTable and
@@ -232,8 +240,12 @@ public class StorageGroupProcessor {
*/
private long dataTTL = Long.MAX_VALUE;
+ /** file system factory (local or hdfs) */
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+
+ /** file flush policy */
private TsFileFlushPolicy fileFlushPolicy;
+
/**
* The max file versions in each partition. By recording this, if several IoTDB instances have the
* same policy of closing file and their ingestion is identical, then files of the same version in
@@ -242,6 +254,7 @@ public class StorageGroupProcessor {
*/
private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
+ /** storage group info for mem control */
private StorageGroupInfo storageGroupInfo = new StorageGroupInfo(this);
/**
* Record the device number of the last TsFile in each storage group, which is applied to
@@ -250,8 +263,13 @@ public class StorageGroupProcessor {
*/
private int deviceNumInLastClosedTsFile = DeviceTimeIndex.INIT_ARRAY_SIZE;
+ /** whether it's ready from recovery */
private boolean isReady = false;
+
+ /** close file listeners */
private List<CloseFileListener> customCloseFileListeners = Collections.emptyList();
+
+ /** flush listeners */
private List<FlushListener> customFlushListeners = Collections.emptyList();
private static final int WAL_BUFFER_SIZE =
@@ -414,6 +432,7 @@ public class StorageGroupProcessor {
return ret;
}
+ /** recover from file */
private void recover() throws StorageGroupProcessorException {
logger.info("recover Storage Group {}", logicalStorageGroupName + "-" + virtualStorageGroupId);
@@ -793,6 +812,11 @@ public class StorageGroupProcessor {
}
}
+ /**
+ * insert one row of data
+ *
+ * @param insertRowPlan one row of data
+ */
public void insert(InsertRowPlan insertRowPlan)
throws WriteProcessException, TriggerExecutionException {
// reject insertions that are out of ttl
@@ -1122,6 +1146,11 @@ public class StorageGroupProcessor {
}
}
+ /**
+ * mem control module use this method to flush memtable
+ *
+ * @param tsFileProcessor tsfile processor in which memtable to be flushed
+ */
public void submitAFlushTaskWhenShouldFlush(TsFileProcessor tsFileProcessor) {
writeLock("submitAFlushTaskWhenShouldFlush");
try {
@@ -1272,6 +1301,12 @@ public class StorageGroupProcessor {
return TsFileResource.getNewTsFileName(System.currentTimeMillis(), version, 0, 0);
}
+ /**
+ * close one tsfile processor
+ *
+ * @param sequence whether this tsfile processor is sequence or not
+ * @param tsFileProcessor tsfile processor
+ */
public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
synchronized (closeStorageGroupCondition) {
try {
@@ -1298,7 +1333,12 @@ public class StorageGroupProcessor {
}
}
- /** thread-safety should be ensured by caller */
+ /**
+ * close one tsfile processor, thread-safety should be ensured by caller
+ *
+ * @param sequence whether this tsfile processor is sequence or not
+ * @param tsFileProcessor tsfile processor
+ */
public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
// for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
// for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
@@ -1337,7 +1377,11 @@ public class StorageGroupProcessor {
}
}
- /** delete the storageGroup's own folder in folder data/system/storage_groups */
+ /**
+ * delete the storageGroup's own folder in folder data/system/storage_groups
+ *
+ * @param systemDir system dir
+ */
public void deleteFolder(String systemDir) {
logger.info(
"{} will close all files for deleting data folder {}",
@@ -1356,6 +1400,7 @@ public class StorageGroupProcessor {
}
}
+ /** close all tsfile resource */
public void closeAllResources() {
for (TsFileResource tsFileResource : tsFileManagement.getTsFileList(false)) {
try {
@@ -1373,6 +1418,7 @@ public class StorageGroupProcessor {
}
}
+ /** release wal buffer */
public void releaseWalDirectByteBufferPool() {
synchronized (walByteBufferPool) {
while (!walByteBufferPool.isEmpty()) {
@@ -1382,6 +1428,7 @@ public class StorageGroupProcessor {
}
}
+ /** delete tsfile */
public void syncDeleteDataFiles() {
logger.info(
"{} will close all files for deleting data files",
@@ -1522,6 +1569,7 @@ public class StorageGroupProcessor {
}
}
+ /** close all working tsfile processors */
public void asyncCloseAllWorkingTsFileProcessors() {
writeLock("asyncCloseAllWorkingTsFileProcessors");
try {
@@ -1543,6 +1591,7 @@ public class StorageGroupProcessor {
}
}
+ /** force close all working tsfile processors */
public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
writeLock("forceCloseAllWorkingTsFileProcessors");
try {
@@ -1565,6 +1614,14 @@ public class StorageGroupProcessor {
}
// TODO need a read lock, please consider the concurrency with flush manager threads.
+ /**
+ * build query data source by searching all tsfile which fit in query filter
+ *
+ * @param fullPath data path
+ * @param context query context
+ * @param timeFilter time filter
+ * @return query data source
+ */
public QueryDataSource query(
PartialPath fullPath,
QueryContext context,
@@ -1605,19 +1662,23 @@ public class StorageGroupProcessor {
}
}
+ /** lock the read lock of the insert lock */
public void readLock() {
insertLock.readLock().lock();
}
+ /** unlock the read lock of insert lock */
public void readUnlock() {
insertLock.readLock().unlock();
}
+ /** lock the write lock of the insert lock */
public void writeLock(String holder) {
insertLock.writeLock().lock();
insertWriteLockHolder = holder;
}
+ /** unlock the write lock of the insert lock */
public void writeUnlock() {
insertWriteLockHolder = "";
insertLock.writeLock().unlock();
@@ -2025,6 +2086,7 @@ public class StorageGroupProcessor {
return upgradeFileCount.get();
}
+ /** upgrade all files belongs to this storage group */
public void upgrade() {
for (TsFileResource seqTsFileResource : upgradeSeqFileList) {
seqTsFileResource.setSeq(true);
@@ -2113,6 +2175,11 @@ public class StorageGroupProcessor {
resources.clear();
}
+ /**
+ * merge file under this storage group processor
+ *
+ * @param isFullMerge whether this merge is a full merge or not
+ */
public void merge(boolean isFullMerge) {
writeLock("merge");
try {
@@ -2696,6 +2763,11 @@ public class StorageGroupProcessor {
return true;
}
+ /**
+ * get all working sequence tsfile processors
+ *
+ * @return all working sequence tsfile processors
+ */
public Collection<TsFileProcessor> getWorkSequenceTsFileProcessors() {
return workSequenceTsFileProcessors.values();
}
@@ -2753,6 +2825,11 @@ public class StorageGroupProcessor {
return true;
}
+ /**
+ * get all working unsequence tsfile processors
+ *
+ * @return all working unsequence tsfile processors
+ */
public Collection<TsFileProcessor> getWorkUnsequenceTsFileProcessors() {
return workUnsequenceTsFileProcessors.values();
}
@@ -2897,6 +2974,11 @@ public class StorageGroupProcessor {
return tsFileManagement;
}
+ /**
+ * insert batch of rows belongs to one device
+ *
+ * @param insertRowsOfOneDevicePlan batch of rows belongs to one device
+ */
public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
throws WriteProcessException, TriggerExecutionException {
writeLock("InsertRowsOfOneDevice");
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 99d22e0..28f0f6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -84,28 +84,43 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
@SuppressWarnings("java:S1135") // ignore todos
public class TsFileProcessor {
+ /** logger fot this class */
private static final Logger logger = LoggerFactory.getLogger(TsFileProcessor.class);
+ /** storgae group name of this tsfile */
private final String storageGroupName;
+ /** IoTDB config */
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ /** whether it's enable mem control */
private final boolean enableMemControl = config.isEnableMemControl();
+
+ /** storage group info for mem control */
private StorageGroupInfo storageGroupInfo;
+ /** tsfile processor info for mem control */
private TsFileProcessorInfo tsFileProcessorInfo;
/** sync this object in query() and asyncTryToFlush() */
private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>();
+ /** modification to memtable mapping */
private List<Pair<Modification, IMemTable>> modsToMemtable = new ArrayList<>();
+
+ /** writer for restore tsfile and flushing */
private RestorableTsFileIOWriter writer;
+
+ /** tsfile resource for index this tsfile */
private final TsFileResource tsFileResource;
- // time range index to indicate this processor belongs to which time range
+
+ /** time range index to indicate this processor belongs to which time range */
private long timeRangeId;
/**
* Whether the processor is in the queue of the FlushManager or being flushed by a flush thread.
*/
private volatile boolean managedByFlushManager;
+ /** a lock to mutual exclude query and query */
private final ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
/**
* It is set by the StorageGroupProcessor and checked by flush threads. (If shouldClose == true
@@ -113,20 +128,29 @@ public class TsFileProcessor {
*/
private volatile boolean shouldClose;
+ /** working memtable */
private IMemTable workMemTable;
/** this callback is called before the workMemtable is added into the flushingMemTables. */
private final UpdateEndTimeCallBack updateLatestFlushTimeCallback;
+ /** Wal log node */
private WriteLogNode logNode;
+
+ /** whether it's a sequence file or not */
private final boolean sequence;
+
+ /** total memtable size for mem control */
private long totalMemTableSize;
private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get flushQueryLock write lock";
private static final String FLUSH_QUERY_WRITE_RELEASE =
"{}: {} get flushQueryLock write lock released";
+ /** close file listener */
private List<CloseFileListener> closeFileListeners = new ArrayList<>();
+
+ /** flush file listener */
private List<FlushListener> flushListeners = new ArrayList<>();
@SuppressWarnings("squid:S107")
@@ -591,6 +615,7 @@ public class TsFileProcessor {
logger.info("File {} is closed synchronously", tsFileResource.getTsFile().getAbsolutePath());
}
+ /** async close one tsfile, register and close it by another thread */
void asyncClose() {
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
@@ -1015,6 +1040,7 @@ public class TsFileProcessor {
}
}
+ /** end file and write some meta */
private void endFile() throws IOException, TsFileProcessorException {
logger.info("Start to end file {}", tsFileResource);
long closeStartTime = System.currentTimeMillis();
@@ -1054,6 +1080,11 @@ public class TsFileProcessor {
this.managedByFlushManager = managedByFlushManager;
}
+ /**
+ * get WAL log node
+ *
+ * @return WAL log node
+ */
public WriteLogNode getLogNode() {
if (logNode == null) {
logNode =
@@ -1065,6 +1096,7 @@ public class TsFileProcessor {
return logNode;
}
+ /** close this tsfile */
public void close() throws TsFileProcessorException {
try {
// when closing resource file, its corresponding mod file is also closed.
@@ -1090,6 +1122,7 @@ public class TsFileProcessor {
return storageGroupName;
}
+ /** get modifications from a memtable */
private List<Modification> getModificationsForMemtable(IMemTable memTable) {
List<Modification> modifications = new ArrayList<>();
boolean foundMemtable = false;
@@ -1102,6 +1135,14 @@ public class TsFileProcessor {
return modifications;
}
+ /**
+ * construct a deletion list from a memtable
+ *
+ * @param memTable memtable
+ * @param deviceId device id
+ * @param measurement measurement name
+ * @param timeLowerBound time water mark
+ */
private List<TimeRange> constructDeletionList(
IMemTable memTable, String deviceId, String measurement, long timeLowerBound)
throws MetadataException {
@@ -1234,6 +1275,7 @@ public class TsFileProcessor {
this.timeRangeId = timeRangeId;
}
+ /** release resource of a memtable */
public void putMemTableBackAndClose() throws TsFileProcessorException {
if (workMemTable != null) {
workMemTable.release();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index c84c5c2..1d46900 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -32,7 +32,11 @@ import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.file.metadata.*;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -50,9 +54,21 @@ import java.io.OutputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.*;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_INDEX;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_MERGECNT_INDEX;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_SEPARATOR;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_TIME_INDEX;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_UNSEQMERGECNT_INDEX;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_VERSION_INDEX;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
@SuppressWarnings("java:S1135") // ignore todos
@@ -64,7 +80,7 @@ public class TsFileResource {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- // tsfile
+ /** this tsfile */
private File file;
public static final String RESOURCE_SUFFIX = ".resource";
@@ -79,6 +95,7 @@ public class TsFileResource {
private TsFileProcessor processor;
+ /** time index */
protected ITimeIndex timeIndex;
/** time index type, fileTimeIndex = 0, deviceTimeIndex = 1 */
@@ -324,6 +341,7 @@ public class TsFileResource {
fsFactory.moveFile(src, dest);
}
+ /** deserialize from disk */
public void deserialize() throws IOException {
try (InputStream inputStream = fsFactory.getBufferedInputStream(file + RESOURCE_SUFFIX)) {
readVersionNumber(inputStream);
@@ -341,6 +359,7 @@ public class TsFileResource {
}
}
+ /** deserialize tsfile resource from old file */
public void deserializeFromOldFile() throws IOException {
try (InputStream inputStream = fsFactory.getBufferedInputStream(file + RESOURCE_SUFFIX)) {
// deserialize old TsfileResource
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/TimeIndexLevel.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/TimeIndexLevel.java
index 072698f..93650fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/TimeIndexLevel.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/TimeIndexLevel.java
@@ -20,7 +20,10 @@
package org.apache.iotdb.db.engine.storagegroup.timeindex;
public enum TimeIndexLevel {
+ /** file to time index (small memory foot print) */
FILE_TIME_INDEX,
+
+ /** device to time index (large memory foot print) */
DEVICE_TIME_INDEX;
public ITimeIndex getTimeIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
index 7a3fb31..8d9d122 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.utils.TestOnly;
public class HashVirtualPartitioner implements VirtualPartitioner {
+ /** total number of virtual storage groups */
public static int STORAGE_GROUP_NUM =
IoTDBDescriptor.getInstance().getConfig().getVirtualStorageGroupNum();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index b7c86b1..e4faadb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -43,6 +43,7 @@ import java.util.Map;
public class VirtualStorageGroupManager {
+ /** logger of this class */
private static final Logger logger = LoggerFactory.getLogger(VirtualStorageGroupManager.class);
/** virtual storage group partitioner */
@@ -393,6 +394,7 @@ public class VirtualStorageGroupManager {
}
}
+ /** release resource of direct wal buffer */
public void releaseWalDirectByteBufferPool() {
for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
if (storageGroupProcessor != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
index 0a73f16..98646fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
@@ -117,6 +117,7 @@ public class SimpleFileVersionController implements VersionController {
prevVersion = currVersion;
}
+ /** recovery from disk */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private void restore() throws IOException {
File directory = SystemFileFactory.INSTANCE.getFile(directoryPath);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
index dcd892f..3296cc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
@@ -237,6 +237,11 @@ public class MNode implements Serializable {
return fullPath;
}
+ /**
+ * get partial path of this node
+ *
+ * @return partial path
+ */
public PartialPath getPartialPath() {
List<String> detachedPath = new ArrayList<>();
MNode temp = this;
@@ -316,6 +321,12 @@ public class MNode implements Serializable {
}
}
+ /**
+ * replace a child of this mnode
+ *
+ * @param measurement measurement name
+ * @param newChildNode new child node
+ */
public void replaceChild(String measurement, MNode newChildNode) {
MNode oldChildNode = this.getChild(measurement);
if (oldChildNode == null) {
@@ -343,6 +354,11 @@ public class MNode implements Serializable {
this.fullPath = fullPath;
}
+ /**
+ * get upper template of this node, remember we get nearest template alone this node to root
+ *
+ * @return upper template
+ */
public Template getUpperTemplate() {
MNode cur = this;
while (cur != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
index fb24c04..e2d309b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
@@ -40,12 +40,16 @@ public class MeasurementMNode extends MNode {
/** measurement's Schema for one timeseries represented by current leaf node */
private IMeasurementSchema schema;
+ /** alias name of this measurement */
private String alias;
- // tag/attribute's start offset in tag file
+
+ /** tag/attribute's start offset in tag file */
private long offset = -1;
+ /** last value cache */
private TimeValuePair cachedLastValuePair = null;
+ /** registered trigger */
private TriggerExecutor triggerExecutor = null;
/** @param alias alias of measurementName */
@@ -77,6 +81,13 @@ public class MeasurementMNode extends MNode {
return cachedLastValuePair;
}
+ /**
+ * update last point cache
+ *
+ * @param timeValuePair last point
+ * @param highPriorityUpdate whether it's a high priority update
+ * @param latestFlushedTime latest flushed time
+ */
public synchronized void updateCachedLast(
TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) {
if (timeValuePair == null || timeValuePair.getValue() == null) {
@@ -180,6 +191,12 @@ public class MeasurementMNode extends MNode {
return node;
}
+ /**
+ * get data type
+ *
+ * @param measurementId if it's a vector schema, we need sensor name of it
+ * @return measurement data type
+ */
public TSDataType getDataType(String measurementId) {
if (schema instanceof MeasurementSchema) {
return schema.getType();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
index a44391d..5a59c52 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
@@ -41,6 +41,11 @@ public class Template {
private Map<String, IMeasurementSchema> schemaMap = new HashMap<>();
+ /**
+ * build a template from a createTemplatePlan
+ *
+ * @param plan createTemplatePlan
+ */
public Template(CreateTemplatePlan plan) {
name = plan.getName();
@@ -104,6 +109,12 @@ public class Template {
this.schemaMap = schemaMap;
}
+ /**
+ * check whether a timeseries path is compatible with this template
+ *
+ * @param path timeseries path
+ * @return whether we can create this new timeseries (whether it's compatible with this template)
+ */
public boolean isCompatible(PartialPath path) {
return !(schemaMap.containsKey(path.getMeasurement())
|| schemaMap.containsKey(path.getDevicePath().getMeasurement()));
@@ -141,6 +152,26 @@ public class Template {
return schemaMap.get(measurementName).getMeasurementId();
}
+ /**
+ * get all path in this template (to support aligned by device query)
+ *
+ * @return a hash map looks like below {vector -> [s1, s2, s3] normal_timeseries -> []}
+ */
+ public HashMap<String, List<String>> getAllPath() {
+ HashMap<String, List<String>> res = new HashMap<>();
+ for (Map.Entry<String, IMeasurementSchema> schemaEntry : schemaMap.entrySet()) {
+ if (schemaEntry.getValue() instanceof VectorMeasurementSchema) {
+ VectorMeasurementSchema vectorMeasurementSchema =
+ (VectorMeasurementSchema) schemaEntry.getValue();
+ res.put(schemaEntry.getKey(), vectorMeasurementSchema.getValueMeasurementIdList());
+ } else {
+ res.put(schemaEntry.getKey(), new ArrayList<>());
+ }
+ }
+
+ return res;
+ }
+
@Override
public boolean equals(Object t) {
if (this == t) {