You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/01/08 06:19:21 UTC
[iotdb] branch master updated: Execute compaction after sync and
load is called (#2439)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 38eb730 Execute compaction after sync and load is called (#2439)
38eb730 is described below
commit 38eb730d735a3e1f3b0beaa956a50e4ea37f9d50
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Fri Jan 8 14:19:06 2021 +0800
Execute compaction after sync and load is called (#2439)
---
.../engine/storagegroup/StorageGroupProcessor.java | 69 ++++++++++++----------
1 file changed, 37 insertions(+), 32 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index fdf623a..bfbd87a 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -819,11 +819,11 @@ public class StorageGroupProcessor {
* inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true otherwise
*/
private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
@@ -982,9 +982,9 @@ public class StorageGroupProcessor {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param sequence whether is sequence or not
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1451,9 +1451,9 @@ public class StorageGroupProcessor {
* Delete data whose timestamp <= 'timestamp' and belongs to the time series
* deviceId.measurementId.
*
- * @param path the timeseries path of the to be deleted.
+ * @param path the timeseries path of the to be deleted.
* @param startTime the startTime of delete range.
- * @param endTime the endTime of delete range.
+ * @param endTime the endTime of delete range.
*/
public void delete(PartialPath path, long startTime, long endTime, long planIndex)
throws IOException {
@@ -1679,25 +1679,7 @@ public class StorageGroupProcessor {
}
logger.info("signal closing storage group condition in {}", storageGroupName);
- if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance()
- .isTerminated()) {
- compactionMergeWorking = true;
- logger.info("{} submit a compaction merge task", storageGroupName);
- try {
- // fork and filter current tsfile, then commit then to compaction merge
- tsFileManagement.forkCurrentFileList(tsFileProcessor.getTimeRangeId());
- CompactionMergeTaskPoolManager.getInstance()
- .submitTask(
- tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack,
- tsFileProcessor.getTimeRangeId()));
- } catch (IOException | RejectedExecutionException e) {
- this.closeCompactionMergeCallBack();
- logger.error("{} compaction submit task failed", storageGroupName);
- }
- } else {
- logger.info("{} last compaction merge task is working, skip current merge",
- storageGroupName);
- }
+ executeCompaction(tsFileProcessor.getTimeRangeId());
}
/**
@@ -1880,6 +1862,29 @@ public class StorageGroupProcessor {
tsFileManagement.writeUnlock();
writeUnlock();
}
+
+ executeCompaction(newFilePartitionId);
+ }
+
+ private void executeCompaction(long timePartition) {
+ if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance()
+ .isTerminated()) {
+ compactionMergeWorking = true;
+ logger.info("{} submit a compaction merge task", storageGroupName);
+ try {
+ // fork and filter current tsfile, then commit then to compaction merge
+ tsFileManagement.forkCurrentFileList(timePartition);
+ CompactionMergeTaskPoolManager.getInstance().submitTask(
+ tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack,
+ timePartition));
+ } catch (IOException | RejectedExecutionException e) {
+ this.closeCompactionMergeCallBack();
+ logger.error("{} compaction submit task failed", storageGroupName);
+ }
+ } else {
+ logger.info("{} last compaction merge task is working, skip current merge",
+ storageGroupName);
+ }
}
/**
@@ -2081,9 +2086,9 @@ public class StorageGroupProcessor {
* returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
* version number is the version number in the tsfile with a larger timestamp.
*
- * @param tsfileName origin tsfile name
+ * @param tsfileName origin tsfile name
* @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
- * 1]
+ * 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2149,8 +2154,8 @@ public class StorageGroupProcessor {
/**
* Execute the loading process by the type.
*
- * @param type load type
- * @param tsFileResource tsfile resource to be loaded
+ * @param type load type
+ * @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
* @return load the file successfully
* @UsedBy sync module, load external tsfile module.