You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/01/08 11:46:00 UTC
[iotdb] 01/01: Revert "Execute compaction after sync and load is
called (#2439)"
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch revert-2439-execute_compaction_after_sync_and_load
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d8f5b360dd41dd23623e869b014a36dc6f747abd
Author: Xiangdong Huang <hx...@qq.com>
AuthorDate: Fri Jan 8 19:45:41 2021 +0800
Revert "Execute compaction after sync and load is called (#2439)"
This reverts commit 38eb730d735a3e1f3b0beaa956a50e4ea37f9d50.
---
.../engine/storagegroup/StorageGroupProcessor.java | 69 ++++++++++------------
1 file changed, 32 insertions(+), 37 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index bfbd87a..fdf623a 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -819,11 +819,11 @@ public class StorageGroupProcessor {
* inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true otherwise
*/
private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
@@ -982,9 +982,9 @@ public class StorageGroupProcessor {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param sequence whether is sequence or not
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1451,9 +1451,9 @@ public class StorageGroupProcessor {
* Delete data whose timestamp <= 'timestamp' and belongs to the time series
* deviceId.measurementId.
*
- * @param path the timeseries path of the to be deleted.
+ * @param path the timeseries path of the to be deleted.
* @param startTime the startTime of delete range.
- * @param endTime the endTime of delete range.
+ * @param endTime the endTime of delete range.
*/
public void delete(PartialPath path, long startTime, long endTime, long planIndex)
throws IOException {
@@ -1679,7 +1679,25 @@ public class StorageGroupProcessor {
}
logger.info("signal closing storage group condition in {}", storageGroupName);
- executeCompaction(tsFileProcessor.getTimeRangeId());
+ if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance()
+ .isTerminated()) {
+ compactionMergeWorking = true;
+ logger.info("{} submit a compaction merge task", storageGroupName);
+ try {
+ // fork and filter current tsfile, then commit then to compaction merge
+ tsFileManagement.forkCurrentFileList(tsFileProcessor.getTimeRangeId());
+ CompactionMergeTaskPoolManager.getInstance()
+ .submitTask(
+ tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack,
+ tsFileProcessor.getTimeRangeId()));
+ } catch (IOException | RejectedExecutionException e) {
+ this.closeCompactionMergeCallBack();
+ logger.error("{} compaction submit task failed", storageGroupName);
+ }
+ } else {
+ logger.info("{} last compaction merge task is working, skip current merge",
+ storageGroupName);
+ }
}
/**
@@ -1862,29 +1880,6 @@ public class StorageGroupProcessor {
tsFileManagement.writeUnlock();
writeUnlock();
}
-
- executeCompaction(newFilePartitionId);
- }
-
- private void executeCompaction(long timePartition) {
- if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance()
- .isTerminated()) {
- compactionMergeWorking = true;
- logger.info("{} submit a compaction merge task", storageGroupName);
- try {
- // fork and filter current tsfile, then commit then to compaction merge
- tsFileManagement.forkCurrentFileList(timePartition);
- CompactionMergeTaskPoolManager.getInstance().submitTask(
- tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack,
- timePartition));
- } catch (IOException | RejectedExecutionException e) {
- this.closeCompactionMergeCallBack();
- logger.error("{} compaction submit task failed", storageGroupName);
- }
- } else {
- logger.info("{} last compaction merge task is working, skip current merge",
- storageGroupName);
- }
}
/**
@@ -2086,9 +2081,9 @@ public class StorageGroupProcessor {
* returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
* version number is the version number in the tsfile with a larger timestamp.
*
- * @param tsfileName origin tsfile name
+ * @param tsfileName origin tsfile name
* @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
- * 1]
+ * 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2154,8 +2149,8 @@ public class StorageGroupProcessor {
/**
* Execute the loading process by the type.
*
- * @param type load type
- * @param tsFileResource tsfile resource to be loaded
+ * @param type load type
+ * @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
* @return load the file successfully
* @UsedBy sync module, load external tsfile module.