You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/19 15:50:11 UTC
[incubator-iotdb] 03/03: improve the implementation of write and
insert process in fileNodeProcessor
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 4c18b6e41dc997bf46789b3fef5d470c172ed51b
Author: lta <li...@163.com>
AuthorDate: Wed Jun 19 23:49:45 2019 +0800
improve the implementation of write and insert process in fileNodeProcessor
---
iotdb/iotdb/conf/iotdb-engine.properties | 4 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +-
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 4 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +-
.../java/org/apache/iotdb/db/engine/Processor.java | 10 +-
.../iotdb/db/engine/UnsealedTsFileProcessorV2.java | 32 +++--
.../engine/bufferwrite/BufferWriteProcessor.java | 16 ++-
.../bufferwrite/RestorableTsFileIOWriter.java | 8 +-
.../db/engine/bufferwriteV2/FlushManager.java | 4 +-
.../iotdb/db/engine/cache/TsFileMetaDataCache.java | 2 +-
.../iotdb/db/engine/cache/TsFileMetadataUtils.java | 2 +-
...teLinkedList.java => CopyOnReadLinkedList.java} | 4 +-
.../iotdb/db/engine/filenode/FileNodeManager.java | 26 ++--
.../db/engine/filenode/FileNodeProcessor.java | 18 +--
.../db/engine/filenode/FileNodeProcessorStore.java | 4 +-
.../iotdb/db/engine/filenode/TsFileResource.java | 4 +-
.../db/engine/filenodeV2/FileNodeManagerV2.java | 4 +-
.../filenodeV2/FileNodeProcessorStoreV2.java | 4 +-
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 133 ++++++++++-----------
.../db/engine/memcontrol/BasicMemController.java | 6 +-
.../apache/iotdb/db/engine/memtable/Callback.java | 4 +-
.../apache/iotdb/db/engine/memtable/IMemTable.java | 2 +-
.../db/engine/memtable/MemTableFlushTask.java | 2 +-
.../db/engine/memtable/MemTableFlushTaskV2.java | 7 +-
.../iotdb/db/engine/memtable/MemTablePool.java | 3 +-
.../db/engine/overflow/io/OverflowProcessor.java | 10 +-
.../db/engine/overflow/io/OverflowResource.java | 2 +-
.../iotdb/db/engine/pool/MergePoolManager.java | 2 +-
.../db/engine/querycontext/ReadOnlyMemChunk.java | 2 +-
.../org/apache/iotdb/db/metadata/MManager.java | 6 +-
.../org/apache/iotdb/db/monitor/StatMonitor.java | 2 +-
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 2 +-
.../db/qp/physical/transfer/CodecInstances.java | 20 ++--
.../db/query/aggregation/AggregateFunction.java | 8 +-
.../iotdb/db/query/context/QueryContext.java | 2 +-
.../iotdb/db/query/control/FileReaderManager.java | 2 +-
.../db/query/control/QueryResourceManager.java | 2 +-
.../query/dataset/AggreResultDataPointReader.java | 2 +-
.../java/org/apache/iotdb/db/query/fill/IFill.java | 2 +-
.../org/apache/iotdb/db/query/reader/IReader.java | 2 +-
.../apache/iotdb/db/service/CloseMergeService.java | 20 ++--
.../java/org/apache/iotdb/db/service/IService.java | 2 +-
.../org/apache/iotdb/db/service/JDBCService.java | 6 +-
.../org/apache/iotdb/db/service/JMXService.java | 2 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 8 +-
.../iotdb/db/sync/conf/SyncSenderDescriptor.java | 4 +-
.../iotdb/db/sync/receiver/SyncServiceImpl.java | 10 +-
.../iotdb/db/sync/receiver/SyncServiceManager.java | 6 +-
.../iotdb/db/sync/sender/SyncSenderImpl.java | 2 +-
.../java/org/apache/iotdb/db/utils/IOUtils.java | 8 +-
.../org/apache/iotdb/db/utils/LoadDataUtils.java | 4 +-
.../org/apache/iotdb/db/writelog/RecoverStage.java | 2 +-
.../iotdb/db/writelog/io/SingleFileLogReader.java | 2 +-
.../writelog/manager/MultiFileLogNodeManager.java | 2 +-
.../db/writelog/node/ExclusiveWriteLogNode.java | 4 +-
.../iotdb/db/writelog/node/WriteLogNode.java | 2 +-
.../writelog/recover/TsFileRecoverPerformer.java | 2 +-
.../engine/bufferwrite/BufferWriteBenchmark.java | 2 +-
.../engine/overflow/io/OverflowResourceTest.java | 2 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 4 +-
.../apache/iotdb/db/utils/OpenFileNumUtilTest.java | 4 +-
.../write/writer/NativeRestorableIOWriter.java | 2 +-
62 files changed, 240 insertions(+), 237 deletions(-)
diff --git a/iotdb/iotdb/conf/iotdb-engine.properties b/iotdb/iotdb/conf/iotdb-engine.properties
index 848696c..bc43953 100644
--- a/iotdb/iotdb/conf/iotdb-engine.properties
+++ b/iotdb/iotdb/conf/iotdb-engine.properties
@@ -116,7 +116,7 @@ force_wal_period_in_ms=10
# The maximum concurrent thread number for merging overflow
# Increase this value, it will increase IO and CPU consumption
-# Decrease this value, when there is much overflow data, it will increase disk usage, which will reduce read speed
+# Decrease this value, when there is much overflow data, it will increase disk usage, which will reduce cloneList speed
# When the value<=0 or > CPU core number, use the CPU core number.
merge_concurrent_threads=0
@@ -127,7 +127,7 @@ merge_concurrent_threads=0
# For an application, the total amount of folder is equal to the number of storage_group settings in SQL
max_opened_folder=100
-# The amount of data that is read every time when IoTDB merge data.
+# The amount of data that is cloneList every time when IoTDB merge data.
fetch_size=10000
# The period time of flushing data from memory to file.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ebaa947..b830929 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -124,7 +124,7 @@ public class IoTDBConfig {
private String indexFileDir = "index";
/**
- * Temporary directory for temporary files of read (External Sort). TODO: unused field
+ * Temporary directory for temporary files of cloneList (External Sort). TODO: unused field
*/
private String readTmpFileDir = "readTmp";
@@ -140,7 +140,7 @@ public class IoTDBConfig {
private int maxOpenFolder = 100;
/**
- * The amount of data that is read every time when IoTDB merges data.
+ * The amount of data that is cloneList every time when IoTDB merges data.
*/
private int fetchSize = 10000;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index e2775cd..2d3b56c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -57,8 +57,8 @@ public class IoTDBConstant {
public static final String MIN_TIME = "min_time";
public static final int MIN_SUPPORTED_JDK_VERSION = 8;
- // for cluster, set read consistency level
- public static final String SET_READ_CONSISTENCY_LEVEL_PATTERN = "set\\s+read.*level.*";
+ // for cluster, set cloneList consistency level
+ public static final String SET_READ_CONSISTENCY_LEVEL_PATTERN = "set\\s+cloneList.*level.*";
public static final String ROLE = "Role";
public static final String USER = "User";
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 57eedfe..e62b8ea 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -79,7 +79,7 @@ public class IoTDBDescriptor {
return;
}
- LOGGER.info("Start to read config file {}", url);
+ LOGGER.info("Start to cloneList config file {}", url);
Properties properties = new Properties();
try {
properties.load(inputStream);
@@ -261,7 +261,7 @@ public class IoTDBDescriptor {
try {
inputStream.close();
} catch (IOException e) {
- LOGGER.error("Fail to close config file input stream because ", e);
+ LOGGER.error("Fail to setCloseMark config file input stream because ", e);
}
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
index b5bd219..946a71b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
@@ -52,7 +52,7 @@ public abstract class Processor {
}
/**
- * Release the read lock
+ * Release the cloneList lock
*/
public void readUnlock() {
lock.readLock().unlock();
@@ -63,7 +63,7 @@ public abstract class Processor {
}
/**
- * Acquire the read lock
+ * Acquire the cloneList lock
*/
public void readLock() {
lock.readLock().lock();
@@ -91,7 +91,7 @@ public abstract class Processor {
/**
* @param isWriteLock
- * true acquire write lock, false acquire read lock
+ * true acquire write lock, false acquire cloneList lock
*/
public void lock(boolean isWriteLock) {
if (isWriteLock) {
@@ -112,7 +112,7 @@ public abstract class Processor {
/**
* @param isWriteUnlock
- * true release write lock, false release read unlock
+ * true putBack write lock, false putBack cloneList unlock
*/
public void unlock(boolean isWriteUnlock) {
// start = System.currentTimeMillis() - start;
@@ -149,7 +149,7 @@ public abstract class Processor {
}
/**
- * Try to get the read lock
+ * Try to get the cloneList lock
*
* @return
*/
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
index a56d29e..a089d0d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
@@ -73,21 +73,21 @@ public class UnsealedTsFileProcessorV2 {
protected VersionController versionController;
- private Callback closeUnsealedTsFileProcessor;
+ private Callback<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor;
/**
* sync this object in query() and asyncFlush()
*/
private final LinkedList<IMemTable> flushingMemTables = new LinkedList<>();
- public UnsealedTsFileProcessorV2(String storageGroupName, File file, FileSchema fileSchema,
- VersionController versionController, Callback closeUnsealedTsFileProcessor)
+ public UnsealedTsFileProcessorV2(String storageGroupName, File tsfile, FileSchema fileSchema,
+ VersionController versionController, Callback<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor)
throws IOException {
this.storageGroupName = storageGroupName;
this.fileSchema = fileSchema;
- this.tsFileResource = new UnsealedTsFileV2(file);
+ this.tsFileResource = new UnsealedTsFileV2(tsfile);
this.versionController = versionController;
- this.writer = new NativeRestorableIOWriter(file);
+ this.writer = new NativeRestorableIOWriter(tsfile);
this.closeUnsealedTsFileProcessor = closeUnsealedTsFileProcessor;
}
@@ -129,11 +129,12 @@ public class UnsealedTsFileProcessorV2 {
/**
* return the memtable to MemTablePool and make metadata in writer visible
*/
- private void removeFlushedMemTable(Object memTable) {
+ private void releaseFlushedMemTable(IMemTable memTable) {
flushQueryLock.writeLock().lock();
try {
writer.makeMetadataVisible();
flushingMemTables.remove(memTable);
+ MemTablePool.getInstance().putBack(memTable);
} finally {
flushQueryLock.writeLock().unlock();
}
@@ -145,20 +146,24 @@ public class UnsealedTsFileProcessorV2 {
}
/**
- * put the workMemtable into flushing list and set null
+ * put the workMemtable into flushing list and set the workMemtable to null
*/
public void asyncFlush() {
flushingMemTables.addLast(workMemTable);
- FlushManager.getInstance().registerBWProcessor(this);
+ FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
workMemTable = null;
}
+ /**
+ * Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of
+ * the flush manager pool
+ */
public void flushOneMemTable() throws IOException {
IMemTable memTableToFlush = flushingMemTables.pollFirst();
// null memtable only appears when calling forceClose()
if (memTableToFlush != null) {
MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName,
- this::removeFlushedMemTable);
+ this::releaseFlushedMemTable);
flushTask.flushMemTable(fileSchema, memTableToFlush, versionController.nextVersion());
}
@@ -198,7 +203,7 @@ public class UnsealedTsFileProcessorV2 {
flushingMemTables.add(workMemTable);
workMemTable = null;
shouldClose = true;
- FlushManager.getInstance().registerBWProcessor(this);
+ FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
}
public boolean shouldClose() {
@@ -208,7 +213,7 @@ public class UnsealedTsFileProcessorV2 {
return fileSize > fileSizeThreshold;
}
- public void close() {
+ public void setCloseMark() {
shouldClose = true;
}
@@ -226,7 +231,8 @@ public class UnsealedTsFileProcessorV2 {
/**
* get the chunk(s) in the memtable ( one from work memtable and the other ones in flushing status
- * and then compact them into one TimeValuePairSorter). Then get its (or their) ChunkMetadata(s).
+ * and then compact them into one TimeValuePairSorter). Then get the related ChunkMetadatas of
+ * data in disk.
*
* @param deviceId device id
* @param measurementId sensor id
@@ -251,7 +257,7 @@ public class UnsealedTsFileProcessorV2 {
ReadOnlyMemChunk timeValuePairSorter = new ReadOnlyMemChunk(dataType, memSeriesLazyMerger,
Collections.emptyMap());
return new Pair<>(timeValuePairSorter,
- writer.getMetadatas(deviceId, measurementId, dataType));
+ writer.getVisibleMetadatas(deviceId, measurementId, dataType));
} finally {
flushQueryLock.readLock().unlock();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 6cb3e72..c9129ef 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -34,9 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.Processor;
@@ -447,7 +445,7 @@ public class BufferWriteProcessor extends Processor {
// switch
if (isCloseTaskCalled) {
LOGGER.info(
- "flushMetadata memtable for bufferwrite processor {} synchronously for close task.",
+ "flushMetadata memtable for bufferwrite processor {} synchronously for setCloseMark task.",
getProcessorName(), FlushPoolManager.getInstance().getWaitingTasksNumber(),
FlushPoolManager.getInstance().getCorePoolSize());
flushTask("synchronously", tmpMemTableToFlush, version, flushId);
@@ -470,7 +468,7 @@ public class BufferWriteProcessor extends Processor {
}
} else {
if (isCloseTaskCalled) {
- MemTablePool.getInstance().release(workMemTable);
+ MemTablePool.getInstance().putBack(workMemTable);
}
flushFuture = new ImmediateFuture<>(true);
}
@@ -486,12 +484,12 @@ public class BufferWriteProcessor extends Processor {
public synchronized void close() throws BufferWriteProcessorException {
try {
// flushMetadata data (if there are flushing task, flushMetadata() will be blocked) and wait for finishing flushMetadata async
- LOGGER.info("Submit a BufferWrite ({}) close task.", getProcessorName());
+ LOGGER.info("Submit a BufferWrite ({}) setCloseMark task.", getProcessorName());
closeFuture = new BWCloseFuture(FlushPoolManager.getInstance().submit(() -> closeTask()));
//now, we omit the future of the closeTask.
} catch (Exception e) {
LOGGER
- .error("Failed to close the bufferwrite processor when calling the action function.", e);
+ .error("Failed to setCloseMark the bufferwrite processor when calling the action function.", e);
throw new BufferWriteProcessorException(e);
}
}
@@ -505,9 +503,9 @@ public class BufferWriteProcessor extends Processor {
// end file
writer.endFile(fileSchema);
//FIXME suppose the flushMetadata-thread-pool is 2.
- // then if a flushMetadata task and a close task are running in the same time
- // and the close task is faster, then writer == null, and the flushMetadata task will throw nullpointer
- // exception. Add "synchronized" keyword on both flushMetadata and close may solve the issue.
+ // then if a flushMetadata task and a setCloseMark task are running in the same time
+ // and the setCloseMark task is faster, then writer == null, and the flushMetadata task will throw nullpointer
+ // exception. Add "synchronized" keyword on both flushMetadata and setCloseMark may solve the issue.
writer = null;
// update the IntervalFile for interval list
bufferwriteCloseConsumer.accept(this);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
index b3fef58..92b6f84 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
@@ -94,7 +94,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
File insertFile = new File(insertFilePath);
File restoreFile = new File(restoreFilePath);
if (insertFile.exists() && restoreFile.exists()) {
- // read restore file
+ // cloneList restore file
Pair<Long, List<ChunkGroupMetaData>> restoreInfo = readRestoreInfo();
long position = restoreInfo.left;
List<ChunkGroupMetaData> existedMetadatas = restoreInfo.right;
@@ -192,7 +192,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
randomAccessFile = new RandomAccessFile(restoreFilePath, DEFAULT_MODE);
try {
long fileLength = randomAccessFile.length();
- // read tsfile position
+ // cloneList tsfile position
long point = randomAccessFile.getFilePointer();
while (point + TS_POSITION_BYTE_SIZE < fileLength) {
byte[] metadataSizeBytes = new byte[TS_METADATA_BYTE_SIZE];
@@ -205,7 +205,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
groupMetaDatas.addAll(tsDeviceMetadata.getChunkGroupMetaDataList());
point = randomAccessFile.getFilePointer();
}
- // read the tsfile position information using byte[8] which is a long.
+ // cloneList the tsfile position information using byte[8] which is a long.
randomAccessFile.read(lastPostionBytes);
long lastPosition = BytesUtils.bytesToLong(lastPostionBytes);
return new Pair<>(lastPosition, groupMetaDatas);
@@ -255,7 +255,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
/**
* add all appendChunkGroupMetadatas into memory. After calling this method, other classes can
- * read these metadata.
+ * cloneList these metadata.
*/
public void makeMetadataVisible() {
if (!append.isEmpty()) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
index c9d3011..2c7f292 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/FlushManager.java
@@ -37,13 +37,13 @@ public class FlushManager {
// TODO do sth
}
udfProcessor.setManagedByFlushManager(false);
- registerBWProcessor(udfProcessor);
+ registerUnsealedTsFileProcessor(udfProcessor);
};
/**
* Add BufferWriteProcessor to asyncFlush manager
*/
- public boolean registerBWProcessor(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
+ public boolean registerUnsealedTsFileProcessor(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
synchronized (unsealedTsFileProcessor) {
if (!unsealedTsFileProcessor.isManagedByFlushManager() && unsealedTsFileProcessor.getFlushingMemTableSize() > 0) {
unsealedTsFileProcessorQueue.add(unsealedTsFileProcessor);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
index 3cfe180..89c5b8d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
@@ -57,7 +57,7 @@ public class TsFileMetaDataCache {
synchronized (internPath) {
cacheRequestNum.incrementAndGet();
if (!cache.containsKey(path)) {
- // read value from tsfile
+ // cloneList value from tsfile
TsFileMetaData fileMetaData = TsFileMetadataUtils.getTsFileMetaData(path);
cache.put(path, fileMetaData);
if (LOGGER.isDebugEnabled()) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetadataUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetadataUtils.java
index 4cd1602..96e8120 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetadataUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetadataUtils.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
/**
- * This class is used to read metadata(<code>TsFileMetaData</code> and
+ * This class is used to cloneList metadata(<code>TsFileMetaData</code> and
* <code>TsRowGroupBlockMetaData</code>).
*/
public class TsFileMetadataUtils {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnWriteLinkedList.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
similarity index 95%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnWriteLinkedList.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
index 89a8c0a..d3b87fa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnWriteLinkedList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
@@ -30,7 +30,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
*
* @param <T>
*/
-public class CopyOnWriteLinkedList<T> {
+public class CopyOnReadLinkedList<T> {
LinkedList<T> data = new LinkedList<>();
List<T> readCopy;
@@ -52,7 +52,7 @@ public class CopyOnWriteLinkedList<T> {
readCopy = null;
}
- public synchronized List<T> read() {
+ public synchronized List<T> cloneList() {
if (readCopy == null) {
readCopy = new ArrayList<>(data);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index a9a8064..5dfa089 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -538,7 +538,7 @@ public class FileNodeManager implements IStatistic, IService {
.getConfig().getBufferwriteFileSizeThreshold()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
- "The filenode processor {} will close the bufferwrite processor, "
+ "The filenode processor {} will setCloseMark the bufferwrite processor, "
+ "because the size[{}] of tsfile {} reaches the threshold {}",
filenodeName, MemUtils.bytesCntToStr(bufferWriteProcessor.getFileSize()),
bufferWriteProcessor.getInsertFilePath(), MemUtils.bytesCntToStr(
@@ -548,7 +548,7 @@ public class FileNodeManager implements IStatistic, IService {
fileNodeProcessor.closeBufferWrite();
start3 = System.currentTimeMillis() - start3;
if (start3 > 1000) {
- LOGGER.info("FileNodeManager.insertBufferWrite step-3, close buffer write cost: {}", start3);
+ LOGGER.info("FileNodeManager.insertBufferWrite step-3, setCloseMark buffer write cost: {}", start3);
}
}
}
@@ -821,7 +821,7 @@ public class FileNodeManager implements IStatistic, IService {
return false;
}
}
- // close bufferwrite file
+ // setCloseMark bufferwrite file
fileNodeProcessor.closeBufferWrite();
// append file to storage group.
fileNodeProcessor.appendFile(appendFile, appendFilePath);
@@ -930,7 +930,7 @@ public class FileNodeManager implements IStatistic, IService {
}
/**
- * try to close the filenode processor. The name of filenode processor is processorName
+ * try to setCloseMark the filenode processor. The name of filenode processor is processorName
*/
private boolean closeOneProcessor(String processorName) throws FileNodeManagerException {
if (!processorMap.containsKey(processorName)) {
@@ -993,7 +993,7 @@ public class FileNodeManager implements IStatistic, IService {
for (String bufferwritePath : bufferwritePathList) {
bufferwritePath = standardizeDir(bufferwritePath) + processorName;
File bufferDir = new File(bufferwritePath);
- // free and close the streams under this bufferwrite directory
+ // free and setCloseMark the streams under this bufferwrite directory
if (!bufferDir.exists()) {
continue;
}
@@ -1070,7 +1070,7 @@ public class FileNodeManager implements IStatistic, IService {
/**
- * Force to close the filenode processor.
+ * Force to setCloseMark the filenode processor.
*/
public void closeOneFileNode(String processorName) throws FileNodeManagerException {
if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
@@ -1079,10 +1079,10 @@ public class FileNodeManager implements IStatistic, IService {
fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
try {
- LOGGER.info("Force to close the filenode processor {}.", processorName);
+ LOGGER.info("Force to setCloseMark the filenode processor {}.", processorName);
while (!closeOneProcessor(processorName)) {
try {
- LOGGER.info("Can't force to close the filenode processor {}, wait 100ms to retry",
+ LOGGER.info("Can't force to setCloseMark the filenode processor {}, wait 100ms to retry",
processorName);
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
@@ -1097,14 +1097,14 @@ public class FileNodeManager implements IStatistic, IService {
}
/**
- * try to close the filenode processor.
+ * try to setCloseMark the filenode processor.
*/
private void close(String processorName) throws FileNodeManagerException {
if (!processorMap.containsKey(processorName)) {
LOGGER.warn("The processorMap doesn't contain the filenode processor {}.", processorName);
return;
}
- LOGGER.info("Try to close the filenode processor {}.", processorName);
+ LOGGER.info("Try to setCloseMark the filenode processor {}.", processorName);
FileNodeProcessor processor = processorMap.get(processorName);
if (!processor.tryWriteLock()) {
LOGGER.warn("Can't get the write lock of the filenode processor {}.", processorName);
@@ -1153,12 +1153,12 @@ public class FileNodeManager implements IStatistic, IService {
}
/**
- * Try to close All.
+ * Try to setCloseMark All.
*/
public void closeAll() throws FileNodeManagerException {
LOGGER.info("Start closing all filenode processor");
if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
- LOGGER.info("Failed to close all filenode processor because of merge operation");
+ LOGGER.info("Failed to setCloseMark all filenode processor because of merge operation");
return;
}
fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
@@ -1268,7 +1268,7 @@ public class FileNodeManager implements IStatistic, IService {
try {
closeAll();
} catch (FileNodeManagerException e) {
- LOGGER.error("Failed to close file node manager because .", e);
+ LOGGER.error("Failed to setCloseMark file node manager because .", e);
}
boolean notFinished = true;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 54191e9..082063a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -151,7 +151,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
//the bufferwrite Processors that are closing. (Because they are not closed well,
// their memtable are not released and we have to query data from them.
//private ConcurrentSkipListSet<BufferWriteProcessor> closingBufferWriteProcessor = new ConcurrentSkipListSet<>();
- private CopyOnWriteLinkedList<BufferWriteProcessor> closingBufferWriteProcessor = new CopyOnWriteLinkedList<>();
+ private CopyOnReadLinkedList<BufferWriteProcessor> closingBufferWriteProcessor = new CopyOnReadLinkedList<>();
private OverflowProcessor overflowProcessor = null;
private Set<Integer> oldMultiPassTokenSet = null;
@@ -503,7 +503,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
if (isMerging == FileNodeProcessorStatus.MERGING_WRITE) {
// re-merge all file
- // if bufferwrite processor is not null, and close
+ // if bufferwrite processor is not null, and setCloseMark
LOGGER.info("The filenode processor {} is recovering, the filenode status is {}.",
getProcessorName(), isMerging);
merge();
@@ -738,7 +738,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* add multiple pass lock.
*/
public int addMultiPassCount() {
- LOGGER.debug("Add MultiPassCount: read lock newMultiPassCount.");
+ LOGGER.debug("Add MultiPassCount: cloneList lock newMultiPassCount.");
newMultiPassCount.incrementAndGet();
while (newMultiPassTokenSet.contains(multiPassLockToken)) {
multiPassLockToken++;
@@ -1014,7 +1014,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
/**
- * Prepare for merge, close the bufferwrite and overflow.
+ * Prepare for merge, setCloseMark the bufferwrite and overflow.
*/
private void prepareForMerge() {
try {
@@ -1026,7 +1026,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
getProcessorName());
// try to get overflow processor
getOverflowProcessor(getProcessorName());
- // must close the overflow processor
+ // must setCloseMark the overflow processor
while (!getOverflowProcessor().canBeClosed()) {
waitForClosing();
}
@@ -1056,7 +1056,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* Merge this storage group, merge the tsfile data with overflow data.
*/
public void merge() throws FileNodeProcessorException {
- // close bufferwrite and overflow, prepare for merge
+ // setCloseMark bufferwrite and overflow, prepare for merge
LOGGER.info("The filenode processor {} begins to merge.", getProcessorName());
writeLock();
prepareForMerge();
@@ -1808,7 +1808,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
@Override
public void close() throws FileNodeProcessorException {
- LOGGER.info("Will close FileNode Processor {}.", getProcessorName());
+ LOGGER.info("Will setCloseMark FileNode Processor {}.", getProcessorName());
Future<Boolean> result = closeBufferWrite();
try {
result.get();
@@ -2017,8 +2017,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
}
- public CopyOnWriteLinkedList<BufferWriteProcessor> getClosingBufferWriteProcessor() {
- for (BufferWriteProcessor processor: closingBufferWriteProcessor.read()) {
+ public CopyOnReadLinkedList<BufferWriteProcessor> getClosingBufferWriteProcessor() {
+ for (BufferWriteProcessor processor: closingBufferWriteProcessor.cloneList()) {
if (processor.isClosed()) {
closingBufferWriteProcessor.remove(processor);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java
index 0873d69..4cc55ea 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java
@@ -32,9 +32,9 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
/**
* FileNodeProcessorStore is used to store information about FileNodeProcessor's status.
- * lastUpdateTime is changed and stored by BufferWrite flushMetadata or BufferWrite close.
+ * lastUpdateTime is changed and stored by BufferWrite flushMetadata or BufferWrite setCloseMark.
* emptyTsFileResource and newFileNodes are changed and stored by Overflow flushMetadata and
- * Overflow close. fileNodeProcessorState is changed and stored by the change of FileNodeProcessor's
+ * Overflow setCloseMark. fileNodeProcessorState is changed and stored by the change of FileNodeProcessor's
* status such as "work->merge merge->wait wait->work". numOfMergeFile is changed
* and stored when FileNodeProcessor's status changes from work to merge.
*/
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
index b5d221f..01e768f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
@@ -58,7 +58,7 @@ public class TsFileResource {
private transient ModificationFile modFile;
/**
- * @param autoRead whether read the file to initialize startTimeMap and endTimeMap
+ * @param autoRead whether cloneList the file to initialize startTimeMap and endTimeMap
*/
public TsFileResource(File file, boolean autoRead) throws IOException {
this(new HashMap<>(), new HashMap<>(), OverflowChangeType.NO_CHANGE, file);
@@ -66,7 +66,7 @@ public class TsFileResource {
//init startTime and endTime
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath())) {
if (reader.readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
- //this is a complete tsfile, and we can read the metadata directly.
+ //this is a complete tsfile, and we can cloneList the metadata directly.
for (Map.Entry<String, TsDeviceMetadataIndex> deviceEntry : reader.readFileMetadata()
.getDeviceMap().entrySet()) {
startTimeMap.put(deviceEntry.getKey(), deviceEntry.getValue().getStartTime());
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index a42d4d4..3d0ad48 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -152,10 +152,10 @@ public class FileNodeManagerV2 implements IService {
private void closeAllFileNodeProcessor() {
synchronized (processorMap) {
- LOGGER.info("Start to close all FileNode");
+ LOGGER.info("Start to setCloseMark all FileNode");
if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
LOGGER.info(
- "Failed to close all FileNode processor because the FileNodeManager's status is {}",
+ "Failed to setCloseMark all FileNode processor because the FileNodeManager's status is {}",
fileNodeManagerStatus);
return;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java
index d13868d..d16006d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java
@@ -33,9 +33,9 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
/**
* FileNodeProcessorStore is used to store information about FileNodeProcessor's status.
- * lastUpdateTime is changed and stored by BufferWrite flushMetadata or BufferWrite close.
+ * lastUpdateTime is changed and stored by BufferWrite flushMetadata or BufferWrite setCloseMark.
* emptyTsFileResource and sequenceFileList are changed and stored by Overflow flushMetadata and
- * Overflow close. fileNodeProcessorState is changed and stored by the change of FileNodeProcessor's
+ * Overflow setCloseMark. fileNodeProcessorState is changed and stored by the change of FileNodeProcessor's
* status such as "work->merge merge->wait wait->work". numOfMergeFile is changed and stored when
* FileNodeProcessor's status changes from work to merge.
*/
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 29ac414..fee51dc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -33,7 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.UnsealedTsFileProcessorV2;
-import org.apache.iotdb.db.engine.filenode.CopyOnWriteLinkedList;
+import org.apache.iotdb.db.engine.filenode.CopyOnReadLinkedList;
import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSourceV2;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
@@ -66,73 +66,76 @@ public class FileNodeProcessorV2 {
private FileSchema fileSchema;
- // for bufferwrite
- // includes sealed and unsealed tsfiles
+ // includes sealed and unsealed sequnce tsfiles
private List<TsFileResourceV2> sequenceFileList;
- private UnsealedTsFileProcessorV2 workBufferWriteProcessor = null;
- private CopyOnWriteLinkedList<UnsealedTsFileProcessorV2> closingBufferWriteProcessor = new CopyOnWriteLinkedList<>();
+ private UnsealedTsFileProcessorV2 workUnsealedSequenceTsFileProcessor = null;
+ private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
- // for overflow
+ // includes sealed and unsealed unsequnce tsfiles
private List<TsFileResourceV2> unSequenceFileList;
- private UnsealedTsFileProcessorV2 workOverflowProcessor = null;
- private CopyOnWriteLinkedList<UnsealedTsFileProcessorV2> closingOverflowProcessor = new CopyOnWriteLinkedList<>();
+ private UnsealedTsFileProcessorV2 workUnsealedUnSequenceTsFileProcessor = null;
+ private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
/**
* device -> global latest timestamp of each device
*/
- private Map<String, Long> latestTimeMap;
+ private Map<String, Long> latestTimeForEachDevice;
/**
* device -> largest timestamp of the latest memtable to be submitted to asyncFlush
*/
- private Map<String, Long> latestFlushTimeMap = new HashMap<>();
+ private Map<String, Long> latestFlushedTimeForEachDevice = new HashMap<>();
- private String storageGroup;
+ private String storageGroupName;
private final ReadWriteLock lock;
private VersionController versionController;
- private String fileNodeRestoreFilePath;
+ // TODO delete the file path
+ private String absoluteFileNodeRestoreFilePath;
+
private FileNodeProcessorStoreV2 fileNodeProcessorStore;
+
+ // TODO delete this lock
private final Object fileNodeRestoreLock = new Object();
- public FileNodeProcessorV2(String baseDir, String storageGroup)
+ public FileNodeProcessorV2(String absoluteBaseDir, String storageGroupName)
throws FileNodeProcessorException {
- this.storageGroup = storageGroup;
+ this.storageGroupName = storageGroupName;
lock = new ReentrantReadWriteLock();
- File storageGroupDir = new File(baseDir + storageGroup);
+ File storageGroupDir = new File(absoluteBaseDir, storageGroupName);
if (!storageGroupDir.exists()) {
storageGroupDir.mkdir();
LOGGER.info("The directory of the storage group {} doesn't exist. Create a new " +
- "directory {}", storageGroup, storageGroupDir.getAbsolutePath());
+ "directory {}", storageGroupName, storageGroupDir.getAbsolutePath());
}
/**
* restore
*/
- File restoreFolder = new File(baseDir + storageGroup);
+ File restoreFolder = new File(absoluteBaseDir + storageGroupName);
if (!restoreFolder.exists()) {
restoreFolder.mkdirs();
LOGGER.info("The restore directory of the filenode processor {} doesn't exist. Create new " +
- "directory {}", storageGroup, restoreFolder.getAbsolutePath());
+ "directory {}", storageGroupName, restoreFolder.getAbsolutePath());
}
- fileNodeRestoreFilePath = new File(restoreFolder, storageGroup + RESTORE_FILE_SUFFIX).getPath();
+ absoluteFileNodeRestoreFilePath = new File(restoreFolder, storageGroupName + RESTORE_FILE_SUFFIX).getAbsolutePath();
try {
fileNodeProcessorStore = readStoreFromDiskOrCreate();
} catch (FileNodeProcessorException e) {
LOGGER.error("The fileNode processor {} encountered an error when recovering restore " +
- "information.", storageGroup);
+ "information.", storageGroupName);
throw new FileNodeProcessorException(e);
}
// TODO deep clone the lastupdate time, change the getSequenceFileList to V2
sequenceFileList = fileNodeProcessorStore.getSequenceFileList();
unSequenceFileList = fileNodeProcessorStore.getUnSequenceFileList();
- latestTimeMap = fileNodeProcessorStore.getLatestTimeMap();
+ latestTimeForEachDevice = fileNodeProcessorStore.getLatestTimeMap();
/**
* version controller
@@ -144,7 +147,7 @@ public class FileNodeProcessorV2 {
}
// construct the file schema
- this.fileSchema = constructFileSchema(storageGroup);
+ this.fileSchema = constructFileSchema(storageGroupName);
}
private FileSchema constructFileSchema(String storageGroupName) {
@@ -181,16 +184,17 @@ public class FileNodeProcessorV2 {
private FileNodeProcessorStoreV2 readStoreFromDiskOrCreate() throws FileNodeProcessorException {
synchronized (fileNodeRestoreLock) {
- File restoreFile = new File(fileNodeRestoreFilePath);
+ File restoreFile = new File(absoluteFileNodeRestoreFilePath);
if (!restoreFile.exists() || restoreFile.length() == 0) {
return new FileNodeProcessorStoreV2(false, new HashMap<>(),
new ArrayList<>(), new ArrayList<>(), FileNodeProcessorStatus.NONE, 0);
}
- try (FileInputStream inputStream = new FileInputStream(fileNodeRestoreFilePath)) {
+ try (FileInputStream inputStream = new FileInputStream(absoluteFileNodeRestoreFilePath)) {
return FileNodeProcessorStoreV2.deSerialize(inputStream);
} catch (IOException e) {
LOGGER
- .error("Failed to deserialize the FileNodeRestoreFile {}, {}", fileNodeRestoreFilePath,
+ .error("Failed to deserialize the FileNodeRestoreFile {}, {}",
+ absoluteFileNodeRestoreFilePath,
e);
throw new FileNodeProcessorException(e);
}
@@ -201,31 +205,30 @@ public class FileNodeProcessorV2 {
throws FileNodeProcessorException {
synchronized (fileNodeRestoreLock) {
- try (FileOutputStream fileOutputStream = new FileOutputStream(fileNodeRestoreFilePath)) {
+ try (FileOutputStream fileOutputStream = new FileOutputStream(absoluteFileNodeRestoreFilePath)) {
fileNodeProcessorStore.serialize(fileOutputStream);
LOGGER.debug("The filenode processor {} writes restore information to the restore file",
- storageGroup);
+ storageGroupName);
} catch (IOException e) {
throw new FileNodeProcessorException(e);
}
}
}
-
public boolean insert(TSRecord tsRecord) {
lock.writeLock().lock();
boolean result;
try {
// init map
- latestTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
- latestFlushTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
+ latestTimeForEachDevice.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
+ latestFlushedTimeForEachDevice.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
// write to sequence or unsequence file
- if (tsRecord.time > latestFlushTimeMap.get(tsRecord.deviceId)) {
- result = writeUnsealedDataFile(workBufferWriteProcessor, tsRecord, true);
+ if (tsRecord.time > latestFlushedTimeForEachDevice.get(tsRecord.deviceId)) {
+ result = writeUnsealedDataFile(workUnsealedSequenceTsFileProcessor, tsRecord, true);
} else {
- result = writeUnsealedDataFile(workOverflowProcessor, tsRecord, false);
+ result = writeUnsealedDataFile(workUnsealedUnSequenceTsFileProcessor, tsRecord, false);
}
} catch (Exception e) {
LOGGER.error("insert tsRecord to unsealed data file failed, because {}", e.getMessage(), e);
@@ -244,15 +247,15 @@ public class FileNodeProcessorV2 {
if (unsealedTsFileProcessor == null) {
if (sequence) {
String baseDir = directories.getNextFolderForTsfile();
- String filePath = Paths.get(baseDir, storageGroup, tsRecord.time + "").toString();
- unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroup, new File(filePath),
+ String filePath = Paths.get(baseDir, storageGroupName, tsRecord.time + "").toString();
+ unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
fileSchema, versionController, this::closeUnsealedTsFileProcessorCallBack);
sequenceFileList.add(unsealedTsFileProcessor.getTsFileResource());
} else {
// TODO check if the disk is full
String baseDir = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDir();
- String filePath = Paths.get(baseDir, storageGroup, tsRecord.time + "").toString();
- unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroup, new File(filePath),
+ String filePath = Paths.get(baseDir, storageGroupName, tsRecord.time + "").toString();
+ unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
fileSchema, versionController, this::closeUnsealedTsFileProcessorCallBack);
unSequenceFileList.add(unsealedTsFileProcessor.getTsFileResource());
}
@@ -262,19 +265,20 @@ public class FileNodeProcessorV2 {
result = unsealedTsFileProcessor.write(tsRecord);
// try to update the latest time of the device of this tsRecord
- if (result && latestTimeMap.get(tsRecord.deviceId) < tsRecord.time) {
- latestTimeMap.put(tsRecord.deviceId, tsRecord.time);
+ if (result && latestTimeForEachDevice.get(tsRecord.deviceId) < tsRecord.time) {
+ latestTimeForEachDevice.put(tsRecord.deviceId, tsRecord.time);
}
// check memtable size and may asyncFlush the workMemtable
if (unsealedTsFileProcessor.shouldFlush()) {
- flushAndCheckClose(unsealedTsFileProcessor, sequence);
+ flushAndCheckShouldClose(unsealedTsFileProcessor, sequence);
}
return result;
}
+ // TODO need a read lock, please consider the concurrency with flush manager threads.
public QueryDataSourceV2 query(String deviceId, String measurementId) {
List<TsFileResourceV2> sequnceResources = getFileReSourceListForQuery(sequenceFileList,
@@ -320,58 +324,51 @@ public class FileNodeProcessorV2 {
/**
- * ensure there must be a flush thread submitted after close() is called, therefore the close task
+ * ensure there must be a flush thread submitted after setCloseMark() is called, therefore the setCloseMark task
* will be executed by a flush thread. -- said by qiaojialin
*
* only called by insert(), thread-safety should be ensured by caller
*/
- private void flushAndCheckClose(UnsealedTsFileProcessorV2 unsealedTsFileProcessor,
+ private void flushAndCheckShouldClose(UnsealedTsFileProcessorV2 unsealedTsFileProcessor,
boolean sequence) {
- boolean shouldClose = false;
- // check file size and may close the BufferWrite
+ // check file size and may setCloseMark the BufferWrite
if (unsealedTsFileProcessor.shouldClose()) {
if (sequence) {
- closingBufferWriteProcessor.add(unsealedTsFileProcessor);
+ closingSequenceTsFileProcessor.add(unsealedTsFileProcessor);
+ workUnsealedSequenceTsFileProcessor = null;
} else {
- closingOverflowProcessor.add(unsealedTsFileProcessor);
+ closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor);
+ workUnsealedUnSequenceTsFileProcessor = null;
}
- unsealedTsFileProcessor.close();
- shouldClose = true;
+ unsealedTsFileProcessor.setCloseMark();
}
unsealedTsFileProcessor.asyncFlush();
- if (shouldClose) {
- if (sequence) {
- workBufferWriteProcessor = null;
- } else {
- workOverflowProcessor = null;
- }
- }
-
// update the largest timestamp in the last flushing memtable
- for (Entry<String, Long> entry : latestTimeMap.entrySet()) {
- latestFlushTimeMap.put(entry.getKey(), entry.getValue());
+ for (Entry<String, Long> entry : latestTimeForEachDevice.entrySet()) {
+ latestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
}
}
/**
- * return the memtable to MemTablePool and make metadata in writer visible
+ * put the memtable back to the MemTablePool and make the metadata in writer visible
*/
- private void closeUnsealedTsFileProcessorCallBack(Object bufferWriteProcessor) {
- closingBufferWriteProcessor.remove((UnsealedTsFileProcessorV2) bufferWriteProcessor);
+ // TODO please consider concurrency with query and write method.
+ private void closeUnsealedTsFileProcessorCallBack(UnsealedTsFileProcessorV2 bufferWriteProcessor) {
+ closingSequenceTsFileProcessor.remove(bufferWriteProcessor);
synchronized (fileNodeProcessorStore) {
- fileNodeProcessorStore.setLatestTimeMap(latestTimeMap);
+ fileNodeProcessorStore.setLatestTimeMap(latestTimeForEachDevice);
if (!sequenceFileList.isEmpty()) {
// end time with one start time
Map<String, Long> endTimeMap = new HashMap<>();
- TsFileResourceV2 resource = workBufferWriteProcessor.getTsFileResource();
+ TsFileResourceV2 resource = workUnsealedSequenceTsFileProcessor.getTsFileResource();
synchronized (resource) {
for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
String deviceId = startTime.getKey();
- endTimeMap.put(deviceId, latestTimeMap.get(deviceId));
+ endTimeMap.put(deviceId, latestTimeForEachDevice.get(deviceId));
}
resource.setEndTimeMap(endTimeMap);
}
@@ -388,10 +385,10 @@ public class FileNodeProcessorV2 {
public void forceClose() {
lock.writeLock().lock();
try {
- if (workBufferWriteProcessor != null) {
- closingBufferWriteProcessor.add(workBufferWriteProcessor);
- workBufferWriteProcessor.forceClose();
- workBufferWriteProcessor = null;
+ if (workUnsealedSequenceTsFileProcessor != null) {
+ closingSequenceTsFileProcessor.add(workUnsealedSequenceTsFileProcessor);
+ workUnsealedSequenceTsFileProcessor.forceClose();
+ workUnsealedSequenceTsFileProcessor = null;
}
} finally {
lock.writeLock().unlock();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
index e490044..220d7bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
@@ -130,7 +130,7 @@ public abstract class BasicMemController implements IService {
public abstract void clear();
/**
- * close MemController.
+ * setCloseMark MemController.
*/
public void close() {
logger.info("MemController exiting");
@@ -170,9 +170,9 @@ public abstract class BasicMemController implements IService {
/**
* When the memories held by one object (like OverflowProcessor or BufferWriteProcessor) is no
- * more useful, this object should call this method to release the memories.
+ * more useful, this object should call this method to putBack the memories.
* @param user an object that holds some memory as a buffer or anything.
- * @param freeSize how many bytes does the object want to release.
+ * @param freeSize how many bytes does the object want to putBack.
*/
public abstract void releaseUsage(Object user, long freeSize);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java
index 6e23d1e..1706203 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.db.engine.memtable;
-public interface Callback {
+public interface Callback<T> {
- void call(Object... object);
+ void call(T object);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index b6ef3a4..967064c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -60,7 +60,7 @@ public interface IMemTable {
Map<String, String> props);
/**
- * release all the memory resources.
+ * putBack all the memory resources.
*/
void clear();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
index ea053c4..5d7e918 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
@@ -150,7 +150,7 @@ public class MemTableFlushTask {
}
}
- MemTablePool.getInstance().release(memTable);
+ MemTablePool.getInstance().putBack(memTable);
LOGGER.info("Processor {} return back a memtable to MemTablePool", processorName);
if (tsFileIoWriter instanceof RestorableTsFileIOWriter) {
try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index ef3a255..a5e229a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -17,6 +17,7 @@ package org.apache.iotdb.db.engine.memtable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -42,11 +43,11 @@ public class MemTableFlushTaskV2 {
private boolean stop = false;
private String processorName;
- private Callback flushCallBack;
+ private Callback<IMemTable> flushCallBack;
private IMemTable memTable;
public MemTableFlushTaskV2(NativeRestorableIOWriter writer, String processorName,
- Callback callBack) {
+ Callback<IMemTable> callBack) {
this.tsFileIoWriter = writer;
this.processorName = processorName;
this.flushCallBack = callBack;
@@ -130,7 +131,7 @@ public class MemTableFlushTaskV2 {
}
}
- MemTablePool.getInstance().release(memTable);
+ MemTablePool.getInstance().putBack(memTable);
LOGGER.info("Processor {} return back a memtable to MemTablePool", processorName);
tsFileIoWriter.makeMetadataVisible();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index c85719b..349f790 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -32,6 +32,7 @@ public class MemTablePool {
/**
* >= number of storage group * 2
+ * TODO check this parameter to ensure that capaity * MaxMemTable Size < JVM memory / 2
*/
private static final int capacity = IoTDBDescriptor.getInstance().getConfig()
.getMaxActiveMemTableSize();
@@ -76,7 +77,7 @@ public class MemTablePool {
}
}
- public void release(IMemTable memTable) {
+ public void putBack(IMemTable memTable) {
synchronized (emptyMemTables) {
memTable.clear();
emptyMemTables.push(memTable);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 80c6224..4042654 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -637,7 +637,7 @@ public class OverflowProcessor extends Processor {
if (isClosed) {
return;
}
- LOGGER.info("The overflow processor {} starts close operation.", getProcessorName());
+ LOGGER.info("The overflow processor {} starts setCloseMark operation.", getProcessorName());
long closeStartTime = System.currentTimeMillis();
// asyncFlush data
try {
@@ -651,11 +651,11 @@ public class OverflowProcessor extends Processor {
throw new OverflowProcessorException(e);
}
if (LOGGER.isInfoEnabled()) {
- LOGGER.info("The overflow processor {} ends close operation.", getProcessorName());
- // log close time
+ LOGGER.info("The overflow processor {} ends setCloseMark operation.", getProcessorName());
+ // log setCloseMark time
long closeEndTime = System.currentTimeMillis();
LOGGER.info(
- "The close operation of overflow processor {} starts at {} and ends at {}."
+ "The setCloseMark operation of overflow processor {} starts at {} and ends at {}."
+ " It comsumes {}ms.",
getProcessorName(), DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
@@ -712,7 +712,7 @@ public class OverflowProcessor extends Processor {
/**
* Check whether current overflow file contains too many metadata or size of current overflow file
- * is too large If true, close current file and open a new one.
+ * is too large If true, setCloseMark current file and open a new one.
*/
private boolean checkSize() {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
index 6cc5fd1..d5c20c0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
@@ -158,7 +158,7 @@ public class OverflowResource {
}
private void readMetadata() throws IOException {
- // read insert meta-data
+ // cloneList insert meta-data
insertIO.toTail();
long position = insertIO.getPos();
while (position != TsFileIOWriter.magicStringBytes.length) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergePoolManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergePoolManager.java
index 5585119..54851ad 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergePoolManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergePoolManager.java
@@ -60,7 +60,7 @@ public class MergePoolManager {
/**
* Refuse new merge submits and exit when all RUNNING THREAD in the pool end.
*
- * @param block if set block to true, this method will wait for timeOut milliseconds to close the
+ * @param block if set block to true, this method will wait for timeOut milliseconds to setCloseMark the
* merge pool. false, return directly.
* @param timeout block time out in milliseconds.
* @throws ProcessorException if timeOut reach or interrupted while waiting to exit.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index 5f7aac8..c256e59 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -82,7 +82,7 @@ public class ReadOnlyMemChunk implements TimeValuePairSorter {
break;
}
}
- //release memory
+ //putBack memory
memSeries = null;
initialized = true;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index c45a3f0..dc67fda 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -62,7 +62,7 @@ public class MManager {
private static final String ROOT_NAME = MetadataConstant.ROOT;
public static final String TIME_SERIES_TREE_HEADER = "=== Timeseries Tree ===\n\n";
- // the lock for read/write
+ // the lock for cloneList/write
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// The file storing the serialize info for metadata
private String datafilePath;
@@ -144,7 +144,7 @@ public class MManager {
} catch (PathErrorException | MetadataArgsErrorException
| ClassNotFoundException | IOException e) {
mgraph = new MGraph(ROOT_NAME);
- LOGGER.error("Cannot read MGraph from file, using an empty new one");
+ LOGGER.error("Cannot cloneList MGraph from file, using an empty new one");
} finally {
lock.writeLock().unlock();
}
@@ -955,7 +955,7 @@ public class MManager {
try(FileOutputStream fos = new FileOutputStream(tempFile);
ObjectOutputStream oos = new ObjectOutputStream(fos)) {
oos.writeObject(mgraph);
- // close the logFile stream
+ // setCloseMark the logFile stream
if (logWriter != null) {
logWriter.close();
logWriter = null;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index 052c324..7888b6f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -284,7 +284,7 @@ public class StatMonitor implements IService {
/**
- * close statistic service.
+ * setCloseMark statistic service.
*/
public void close() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index e772bf5..75f6e6f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -535,7 +535,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
}
closeFileNodes.removeAll(deleteFielNodes);
for (String deleteFileNode : deleteFielNodes) {
- // close processor
+ // setCloseMark processor
fileNodeManager.deleteOneFileNode(deleteFileNode);
}
for (String closeFileNode : closeFileNodes) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/CodecInstances.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/CodecInstances.java
index 4fcaac6..9435be1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/CodecInstances.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/CodecInstances.java
@@ -73,10 +73,10 @@ public class CodecInstances {
}
/**
- * Read a string value from ByteBuffer, first read a int that represents the bytes len of string,
- * then read bytes len value, finally transfer bytes to string, string may be null
+ * Read a string value from ByteBuffer, first cloneList a int that represents the bytes len of string,
+ * then cloneList bytes len value, finally transfer bytes to string, string may be null
*
- * @param buffer ByteBuffer to be read
+ * @param buffer ByteBuffer to be cloneList
* @return string value
*/
static String readString(ByteBuffer buffer) {
@@ -122,7 +122,7 @@ public class CodecInstances {
@Override
public DeletePlan decode(byte[] bytes) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
- buffer.get(); // read and skip an int representing "type".
+ buffer.get(); // cloneList and skip an int representing "type".
long time = buffer.getLong();
String path = readString(buffer);
@@ -154,7 +154,7 @@ public class CodecInstances {
@Override
public UpdatePlan decode(byte[] bytes) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
- buffer.get(); // read and skip an int representing "type"
+ buffer.get(); // cloneList and skip an int representing "type"
int timeListBytesLength = buffer.getInt();
List<Pair<Long, Long>> timeArrayList = new ArrayList<>(timeListBytesLength);
@@ -203,7 +203,7 @@ public class CodecInstances {
public InsertPlan decode(byte[] bytes) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
- buffer.get(); // read and skip an int representing "type"
+ buffer.get(); // cloneList and skip an int representing "type"
int insertType = buffer.get();
long time = buffer.getLong();
@@ -295,7 +295,7 @@ public class CodecInstances {
public MetadataPlan decode(byte[] bytes) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
- buffer.get(); // read and skip an int representing "type"
+ buffer.get(); // cloneList and skip an int representing "type"
byte namespaceTypeByte = buffer.get();
MetadataOperator.NamespaceType namespaceType = null;
@@ -380,7 +380,7 @@ public class CodecInstances {
public AuthorPlan decode(byte[] bytes) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
- buffer.get(); // read and skip an int representing "type"
+ buffer.get(); // cloneList and skip an int representing "type"
AuthorOperator.AuthorType authorType = AuthorOperator.AuthorType.deserialize(buffer.get());
String userName = readString(buffer);
@@ -427,7 +427,7 @@ public class CodecInstances {
public LoadDataPlan decode(byte[] bytes) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
- buffer.get(); // read and skip an int representing "type"
+ buffer.get(); // cloneList and skip an int representing "type"
String inputFilePath = readString(buffer);
String measureType = readString(buffer);
@@ -459,7 +459,7 @@ public class CodecInstances {
public PropertyPlan decode(byte[] bytes) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
- buffer.get(); // read and skip an int representing "type"
+ buffer.get(); // cloneList and skip an int representing "type"
PropertyOperator.PropertyType propertyType = PropertyOperator.PropertyType
.deserialize(buffer.get());
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
index f50867f..6c11cc3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
@@ -64,7 +64,7 @@ public abstract class AggregateFunction {
*
* @param dataInThisPage the data in the DataPage
* @param unsequenceReader unsequence data reader
- * @throws IOException TsFile data read exception
+ * @throws IOException TsFile data cloneList exception
* @throws ProcessorException wrong aggregation method parameter
*/
public abstract void calculateValueFromPageData(BatchData dataInThisPage,
@@ -79,7 +79,7 @@ public abstract class AggregateFunction {
* @param dataInThisPage the data in the DataPage
* @param unsequenceReader unsequence data reader
* @param bound the time upper bounder of data in unsequence data reader
- * @throws IOException TsFile data read exception
+ * @throws IOException TsFile data cloneList exception
* @throws ProcessorException wrong aggregation method parameter
*/
public abstract void calculateValueFromPageData(BatchData dataInThisPage,
@@ -102,7 +102,7 @@ public abstract class AggregateFunction {
*
* @param unsequenceReader unsequence data reader
* @param bound the time upper bounder of data in unsequence data reader
- * @throws IOException TsFile data read exception
+ * @throws IOException TsFile data cloneList exception
*/
public abstract void calculateValueFromUnsequenceReader(IPointReader unsequenceReader, long bound)
throws IOException, ProcessorException;
@@ -112,7 +112,7 @@ public abstract class AggregateFunction {
* This method is calculate the aggregation using the common timestamps of cross series filter.
* </p>
*
- * @throws IOException TsFile data read error
+ * @throws IOException TsFile data cloneList error
*/
public abstract void calcAggregationUsingTimestamps(long[] timestamps, int length,
EngineReaderByTimeStamp dataReader) throws IOException;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
index cfb06ad..c7b4be8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
@@ -54,7 +54,7 @@ public class QueryContext {
}
/**
- * Find the modifications of timeseries 'path' in 'modFile'. If they are not in the cache, read
+ * Find the modifications of timeseries 'path' in 'modFile'. If they are not in the cache, cloneList
* them from 'modFile' and put then into the cache.
*/
public List<Modification> getPathModifications(ModificationFile modFile, String path)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index cd33c91..5dc6879 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -107,7 +107,7 @@ public class FileReaderManager implements IService {
try {
reader.close();
} catch (IOException e) {
- LOGGER.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
+ LOGGER.error("Can not setCloseMark TsFileSequenceReader {} !", reader.getFileName(), e);
}
readerMap.remove(entry.getKey());
refMap.remove(entry.getKey());
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index c5f029e..4f7a0bd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -48,7 +48,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
* 3. (if using filter)beginQueryOfGivenExpression
* - remind FileNodeManager that some files are being used
* 4. getQueryDataSource - open files for the job or reuse existing readers.
- * 5. endQueryForGivenJob - release the resource used by this job.
+ * 5. endQueryForGivenJob - putBack the resource used by this job.
* </p>
*/
public class QueryResourceManager {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
index 6545bfa..5a76ad0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
@@ -47,6 +47,6 @@ public class AggreResultDataPointReader implements IPointReader {
@Override
public void close() {
- // batch data doesn't need to close.
+ // batch data doesn't need to setCloseMark.
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
index d64b49a..83291b7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
@@ -117,7 +117,7 @@ public abstract class IFill {
@Override
public void close() {
- // no need to close
+ // no need to setCloseMark
}
}
}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java
index 2982a97..d0603f5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java
@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.iotdb.db.utils.TimeValuePair;
/**
- * Vital read interface. Batch method is used to increase query speed. Getting a batch of data a
+ * Vital cloneList interface. Batch method is used to increase query speed. Getting a batch of data a
* time is faster than getting one point a time.
*/
public interface IReader {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java b/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java
index 0e7cb8a..5db4a78 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java
@@ -32,7 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A service that triggers close and merge operation regularly.
+ * A service that triggers setCloseMark and merge operation regularly.
*
* @author liukun
*/
@@ -76,35 +76,35 @@ public class CloseMergeService implements IService {
public void startService() {
if (dbConfig.isEnableTimingCloseAndMerge()) {
if (!isStart) {
- LOGGER.info("Start the close and merge service");
+ LOGGER.info("Start the setCloseMark and merge service");
closeAndMergeDaemon.start();
isStart = true;
closeAllLastTime = System.currentTimeMillis();
mergeAllLastTime = System.currentTimeMillis();
} else {
- LOGGER.warn("The close and merge service has been already running.");
+ LOGGER.warn("The setCloseMark and merge service has been already running.");
}
} else {
- LOGGER.info("Cannot start close and merge service, it is disabled by configuration.");
+ LOGGER.info("Cannot start setCloseMark and merge service, it is disabled by configuration.");
}
}
/**
- * close service.
+ * setCloseMark service.
*/
public void closeService() {
if (dbConfig.isEnableTimingCloseAndMerge()) {
if (isStart) {
- LOGGER.info("Prepare to shutdown the close and merge service.");
+ LOGGER.info("Prepare to shutdown the setCloseMark and merge service.");
isStart = false;
synchronized (service) {
service.shutdown();
service.notifyAll();
}
resetCloseMergeService();
- LOGGER.info("Shutdown close and merge service successfully.");
+ LOGGER.info("Shutdown setCloseMark and merge service successfully.");
} else {
- LOGGER.warn("The close and merge service is not running now.");
+ LOGGER.warn("The setCloseMark and merge service is not running now.");
}
}
}
@@ -198,13 +198,13 @@ public class CloseMergeService implements IService {
IoTDBDescriptor.getInstance().getConfig().getZoneID());
long timeInterval = thisCloseTime - closeAllLastTime;
LOGGER.info(
- "Start the close action regularly, last time is {}, this time is {}, "
+ "Start the setCloseMark action regularly, last time is {}, this time is {}, "
+ "time interval is {}s.", startDateTime, endDateTime, timeInterval / 1000);
closeAllLastTime = System.currentTimeMillis();
try {
FileNodeManager.getInstance().closeAll();
} catch (Exception e) {
- LOGGER.error("close all error.", e);
+ LOGGER.error("setCloseMark all error.", e);
}
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/IService.java b/iotdb/src/main/java/org/apache/iotdb/db/service/IService.java
index c73ec07..222f3c9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/IService.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/IService.java
@@ -29,7 +29,7 @@ public interface IService {
/**
* Stop current service. If current service uses thread or thread pool,
- * current service should guarantee to release thread or thread pool.
+ * current service should guarantee to putBack thread or thread pool.
*/
void stop();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
index a18e263..f75c350 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
@@ -165,9 +165,9 @@ public class JDBCService implements JDBCServiceMBean, IService {
try {
stopLatch.await();
reset();
- LOGGER.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+ LOGGER.info("{}: setCloseMark {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
} catch (InterruptedException e) {
- LOGGER.error("{}: close {} failed because {}", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
+ LOGGER.error("{}: setCloseMark {} failed because {}", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
}
}
@@ -228,7 +228,7 @@ public class JDBCService implements JDBCServiceMBean, IService {
if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
threadStopLatch.countDown();
}
- LOGGER.info("{}: close TThreadPoolServer and TServerSocket for {}",
+ LOGGER.info("{}: setCloseMark TThreadPoolServer and TServerSocket for {}",
IoTDBConstant.GLOBAL_DB_NAME,
getID().getName());
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/JMXService.java b/iotdb/src/main/java/org/apache/iotdb/db/service/JMXService.java
index 9ddc9da..d2cfaaa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/JMXService.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/JMXService.java
@@ -147,7 +147,7 @@ public class JMXService implements IService {
if (jmxConnectorServer != null) {
try {
jmxConnectorServer.stop();
- LOGGER.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME,
+ LOGGER.info("{}: setCloseMark {} successfully", IoTDBConstant.GLOBAL_DB_NAME,
this.getID().getName());
} catch (IOException e) {
LOGGER.error("Failed to stop {} because of: ", this.getID().getName(), e);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 20f9cdf..7aec3f9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -165,7 +165,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSCloseSessionResp closeSession(TSCloseSessionReq req) throws TException {
- LOGGER.info("{}: receive close session", IoTDBConstant.GLOBAL_DB_NAME);
+ LOGGER.info("{}: receive setCloseMark session", IoTDBConstant.GLOBAL_DB_NAME);
TS_Status tsStatus;
if (username.get() == null) {
tsStatus = new TS_Status(TS_StatusCode.ERROR_STATUS);
@@ -190,7 +190,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSCloseOperationResp closeOperation(TSCloseOperationReq req) throws TException {
- LOGGER.info("{}: receive close operation", IoTDBConstant.GLOBAL_DB_NAME);
+ LOGGER.info("{}: receive setCloseMark operation", IoTDBConstant.GLOBAL_DB_NAME);
try {
releaseQueryResource(req);
@@ -548,7 +548,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
statement = statement.toLowerCase().trim();
if (Pattern.matches(IoTDBConstant.SET_READ_CONSISTENCY_LEVEL_PATTERN, statement)) {
throw new Exception(
- "IoTDB Stand-alone version does not support setting read-write consistency level");
+ "IoTDB Stand-alone version does not support setting cloneList-write consistency level");
} else {
return false;
}
@@ -695,7 +695,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
break;
default:
- throw new TException("not support " + type + " in new read process");
+ throw new TException("not support " + type + " in new cloneList process");
}
}
return resp;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
index ec1dc03..7c5dd0f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
@@ -79,7 +79,7 @@ public class SyncSenderDescriptor {
return;
}
- LOGGER.info("Start to read sync config file {}", url);
+ LOGGER.info("Start to cloneList sync config file {}", url);
Properties properties = new Properties();
try {
properties.load(inputStream);
@@ -120,7 +120,7 @@ public class SyncSenderDescriptor {
try {
inputStream.close();
} catch (IOException e) {
- LOGGER.error("Fail to close sync config file input stream because ", e);
+ LOGGER.error("Fail to setCloseMark sync config file input stream because ", e);
}
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
index ad2722d..439a9cc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
@@ -255,7 +255,7 @@ public class SyncServiceImpl implements SyncService.Iface {
operation(metadataOperation);
}
} catch (FileNotFoundException e) {
- logger.error("Cannot read the file {}.",
+ logger.error("Cannot cloneList the file {}.",
schemaFromSenderPath.get(), e);
return false;
} catch (IOException e) {
@@ -411,7 +411,7 @@ public class SyncServiceImpl implements SyncService.Iface {
endTimeMap.put(key, device.getEndTime());
}
} catch (IOException e) {
- logger.error("Unable to read tsfile {}", fileTF.getPath());
+ logger.error("Unable to cloneList tsfile {}", fileTF.getPath());
throw new IOException(e);
} finally {
try {
@@ -419,7 +419,7 @@ public class SyncServiceImpl implements SyncService.Iface {
reader.close();
}
} catch (IOException e) {
- logger.error("Cannot close tsfile stream {}", fileTF.getPath());
+ logger.error("Cannot setCloseMark tsfile stream {}", fileTF.getPath());
throw new IOException(e);
}
}
@@ -575,7 +575,7 @@ public class SyncServiceImpl implements SyncService.Iface {
reader.close();
}
} catch (IOException e) {
- logger.error("Cannot close file stream {}", filePath, e);
+ logger.error("Cannot setCloseMark file stream {}", filePath, e);
}
}
}
@@ -662,7 +662,7 @@ public class SyncServiceImpl implements SyncService.Iface {
try {
closeReaders(tsfilesReaders);
} catch (IOException e) {
- logger.error("Cannot close file stream {}", filePath, e);
+ logger.error("Cannot setCloseMark file stream {}", filePath, e);
}
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
index 47e8f10..15a500d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
@@ -110,9 +110,9 @@ public class SyncServiceManager implements IService {
try {
stopLatch.await();
resetLatch();
- LOGGER.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+ LOGGER.info("{}: setCloseMark {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
} catch (InterruptedException e) {
- LOGGER.error("{}: close {} failed because {}", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
+ LOGGER.error("{}: setCloseMark {} failed because {}", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
}
}
@@ -172,7 +172,7 @@ public class SyncServiceManager implements IService {
if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
threadStopLatch.countDown();
}
- LOGGER.info("{}: close TThreadPoolServer and TServerSocket for {}",
+ LOGGER.info("{}: setCloseMark TThreadPoolServer and TServerSocket for {}",
IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
index 0fbfac4..88e4652 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
@@ -300,7 +300,7 @@ public class SyncSenderImpl implements SyncSender {
try (BufferedReader bf = new BufferedReader((new FileReader(uuidPath)))) {
uuid = bf.readLine();
} catch (IOException e) {
- LOGGER.error("Cannot read UUID from file{}", file.getPath());
+ LOGGER.error("Cannot cloneList UUID from file{}", file.getPath());
throw new IOException(e);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/IOUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/IOUtils.java
index 405339d..bd04733 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/IOUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/IOUtils.java
@@ -83,12 +83,12 @@ public class IOUtils {
/**
* Read a string from the given stream.
*
- * @param inputStream the source to read.
+ * @param inputStream the source to cloneList.
* @param encoding string encoding like 'utf-8'.
* @param strBufferLocal a ThreadLocal buffer may be passed to avoid
* frequently memory allocations. A null may also be passed
* to use a local buffer.
- * @return a string read from the stream.
+ * @return a string cloneList from the stream.
* @throws IOException when an exception raised during operating the stream.
*/
public static String readString(DataInputStream inputStream, String encoding,
@@ -116,12 +116,12 @@ public class IOUtils {
/**
* Read a PathPrivilege from the given stream.
*
- * @param inputStream the source to read.
+ * @param inputStream the source to cloneList.
* @param encoding string encoding like 'utf-8'.
* @param strBufferLocal a ThreadLocal buffer may be passed to avoid
* frequently memory allocations. A null may also be passed
* to use a local buffer.
- * @return a PathPrivilege read from the stream.
+ * @return a PathPrivilege cloneList from the stream.
* @throws IOException when an exception raised during operating the stream.
*/
public static PathPrivilege readPathPrivilege(DataInputStream inputStream, String encoding,
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/LoadDataUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/LoadDataUtils.java
index 69a6935..895bfa7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/LoadDataUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/LoadDataUtils.java
@@ -116,7 +116,7 @@ public class LoadDataUtils {
lineCount++;
}
} catch (IOException e1) {
- logger.error("read line from inputCsvFileReader failed:{}", inputCsvDataPath, e1);
+ logger.error("cloneList line from inputCsvFileReader failed:{}", inputCsvDataPath, e1);
extraDataFilePath = null;
} finally {
logger.info("write line:{}", lineCount);
@@ -187,7 +187,7 @@ public class LoadDataUtils {
extraDataFileWriter.close();
}
} catch (IOException e) {
- logger.error("close inputCsvFileReader and extraDataFileWriter failed", e);
+ logger.error("setCloseMark inputCsvFileReader and extraDataFileWriter failed", e);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/RecoverStage.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/RecoverStage.java
index 47c32d8..5aafdf6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/RecoverStage.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/RecoverStage.java
@@ -37,7 +37,7 @@ public enum RecoverStage {
RECOVER_FILE,
/**
- * In this stage, the mission is to read logs from wal and wal-old files (if exists) and replay
+ * In this stage, the mission is to cloneList logs from wal and wal-old files (if exists) and replay
* them. Should SET flag afterward,
*/
REPLAY_LOG,
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
index 8ccae9c..e39aeab 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
@@ -93,7 +93,7 @@ public class SingleFileLogReader implements ILogReader {
try {
logStream.close();
} catch (IOException e) {
- logger.error("Cannot close log file {}", filepath, e);
+ logger.error("Cannot setCloseMark log file {}", filepath, e);
}
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index 121426a..04b41d4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -120,7 +120,7 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
try {
node.close();
} catch (IOException e) {
- logger.error("{} failed to close", node.toString(), e);
+ logger.error("{} failed to setCloseMark", node.toString(), e);
}
}
nodeMap.clear();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 4036446..82e8017 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -110,11 +110,11 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
}
logger.debug("Log node {} closed successfully", identifier);
} catch (IOException e) {
- logger.error("Cannot close log node {} because:", identifier, e);
+ logger.error("Cannot setCloseMark log node {} because:", identifier, e);
}
long elapse = System.currentTimeMillis() - start;
if (elapse > 1000) {
- logger.info("WAL close cost {} ms", elapse);
+ logger.info("WAL setCloseMark cost {} ms", elapse);
}
unlockForForceOther();
unlockForOther();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
index 60ab059..f9c5f35 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
@@ -36,7 +36,7 @@ public interface WriteLogNode {
void write(PhysicalPlan plan) throws IOException;
/**
- * Sync and close streams.
+ * Sync and setCloseMark streams.
*/
void close() throws IOException;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 86f63d4..37b8c6f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -72,7 +72,7 @@ public class TsFileRecoverPerformer {
try {
restorableTsFileIOWriter.endFile(fileSchema);
} catch (IOException e) {
- throw new ProcessorException("Cannot close file when recovering", e);
+ throw new ProcessorException("Cannot setCloseMark file when recovering", e);
}
removeTruncatePosition(insertFile);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
index c9375da..cb3cbd0 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
@@ -102,7 +102,7 @@ public class BufferWriteBenchmark {
// }
// }
// long endTime = System.currentTimeMillis();
-// bufferWriteProcessor.close();
+// bufferWriteProcessor.setCloseMark();
// System.out.println(String.format(
// "Num of time series: %d, " + "Num of points for each time series: %d, "
// + "The total time: %d ms. ",
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
index 28eefb0..e54a80d 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
@@ -78,7 +78,7 @@ public class OverflowResourceTest {
ChunkMetaData chunkMetaData = chunkMetaDatas.get(0);
assertEquals(OverflowTestUtils.dataType1, chunkMetaData.getTsDataType());
assertEquals(OverflowTestUtils.measurementId1, chunkMetaData.getMeasurementUid());
- // close
+ // setCloseMark
work.close();
// append file
long originlength = insertFile.length();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 31f12fd..84df8d5 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -74,7 +74,7 @@ public class EnvironmentUtils {
// clean filenode manager
try {
if (!FileNodeManager.getInstance().deleteAll()) {
- LOGGER.error("Can't close the filenode manager in EnvironmentUtils");
+ LOGGER.error("Can't setCloseMark the filenode manager in EnvironmentUtils");
Assert.fail();
}
} catch (FileNodeManagerException e) {
@@ -87,7 +87,7 @@ public class EnvironmentUtils {
// clean cache
TsFileMetaDataCache.getInstance().clear();
RowGroupBlockMetaDataCache.getInstance().clear();
- // close metadata
+ // setCloseMark metadata
MManager.getInstance().clear();
MManager.getInstance().flushObjectToFile();
// delete all directory
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/OpenFileNumUtilTest.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/OpenFileNumUtilTest.java
index 7a616f1..75e12c3 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/OpenFileNumUtilTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/OpenFileNumUtilTest.java
@@ -78,7 +78,7 @@ public class OpenFileNumUtilTest {
@After
public void tearDown() {
- // close FileWriter
+ // setCloseMark FileWriter
for (FileWriter fw : fileWriterList) {
try {
fw.close();
@@ -272,7 +272,7 @@ public class OpenFileNumUtilTest {
}
totalOpenFileNumAfter = openFileNumUtil.get(OpenFileNumStatistics.DATA_OPEN_FILE_NUM);
totalOpenFileNumChange = totalOpenFileNumAfter - totalOpenFileNumBefore;
- // close FileWriter shall cause total open file number decrease by testFileNum
+ // setCloseMark FileWriter shall cause total open file number decrease by testFileNum
if (openFileNumUtil.isCommandValid()) {
assertEquals(-testFileNum, totalOpenFileNumChange);
} else {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
index a5fa4da..f2e0d01 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
@@ -115,7 +115,7 @@ public class NativeRestorableIOWriter extends TsFileIOWriter {
* @param dataType the value type
* @return chunks' metadata
*/
- public List<ChunkMetaData> getMetadatas(String deviceId, String measurementId, TSDataType dataType) {
+ public List<ChunkMetaData> getVisibleMetadatas(String deviceId, String measurementId, TSDataType dataType) {
List<ChunkMetaData> chunkMetaDatas = new ArrayList<>();
if (metadatas.containsKey(deviceId) && metadatas.get(deviceId).containsKey(measurementId)) {
for (ChunkMetaData chunkMetaData : metadatas.get(deviceId).get(measurementId)) {