You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/10/25 09:46:08 UTC
[incubator-iotdb] branch dev_new_merge_strategy updated: fit new
strategy in previous code
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_new_merge_strategy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_new_merge_strategy by this push:
new a3d3c69 fit new strategy in previous code
a3d3c69 is described below
commit a3d3c69f6dc417b642d6ecd879298b5cd84a070a
Author: jt <jt...@163.com>
AuthorDate: Fri Oct 25 17:45:45 2019 +0800
fit new strategy in previous code
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +-
...ergeFileSelector.java => BaseFileSelector.java} | 250 +++++++----------
.../iotdb/db/engine/merge/IMergeFileSelector.java | 42 ++-
...rgeFileStrategy.java => IRecoverMergeTask.java} | 12 +-
.../selector => }/MaxSeriesMergeFileSelector.java | 92 +++++--
.../iotdb/db/engine/merge/MergeFileStrategy.java | 67 +++++
.../{squeeze/selector => }/NaivePathSelector.java | 2 +-
.../{MergeLogger.java => InplaceMergeLogger.java} | 4 +-
.../engine/merge/inplace/recover/LogAnalyzer.java | 2 +-
.../inplace/selector/InplaceMaxFileSelector.java | 127 +++++++++
.../selector/MaxSeriesMergeFileSelector.java | 108 --------
.../merge/inplace/selector/NaivePathSelector.java | 53 ----
.../merge/inplace/task/InplaceMergeTask.java | 10 +-
.../engine/merge/inplace/task/MergeFileTask.java | 6 +-
.../merge/inplace/task/MergeMultiChunkTask.java | 16 +-
.../inplace/task/RecoverInplaceMergeTask.java | 11 +-
.../iotdb/db/engine/merge/manage/MergeManager.java | 2 +-
.../engine/merge/squeeze/recover/LogAnalyzer.java | 6 +-
.../{MergeLogger.java => SqueezeMergeLogger.java} | 4 +-
.../squeeze/selector/MaxFileMergeFileSelector.java | 306 ---------------------
.../merge/squeeze/selector/MergeFileStrategy.java | 27 --
.../squeeze/selector/SqueezeMaxFileSelector.java | 140 ++++++++++
.../engine/merge/squeeze/task/MergeSeriesTask.java | 8 +-
.../squeeze/task/RecoverSqueezeMergeTask.java | 10 +-
.../merge/squeeze/task/SqueezeMergeTask.java | 10 +-
.../engine/storagegroup/StorageGroupProcessor.java | 81 ++++--
.../MaxFileMergeFileSelectorTest.java | 48 ++--
.../MaxSeriesMergeFileSelectorTest.java | 64 +++--
.../engine/merge/{ => inplace}/MergeLogTest.java | 3 +-
.../engine/merge/{ => inplace}/MergePerfTest.java | 13 +-
.../engine/merge/{ => inplace}/MergeTaskTest.java | 3 +-
.../engine/merge/{ => squeeze}/MergeTaskTest.java | 5 +-
.../storagegroup/StorageGroupProcessorTest.java | 4 +-
33 files changed, 720 insertions(+), 820 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c5cd266..0b0fe68 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -25,7 +25,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.iotdb.db.engine.merge.inplace.selector.MergeFileStrategy;
+import org.apache.iotdb.db.engine.merge.MergeFileStrategy;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.service.TSServiceImpl;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -375,7 +375,7 @@ public class IoTDBConfig {
*/
private int chunkMergePointThreshold = 20480;
- private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.MAX_SERIES_NUM;
+ private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.INPLACE_MAX_SERIES_NUM;
/**
* Default system file storage is in local file system (unsupported)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/MaxFileMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/BaseFileSelector.java
similarity index 51%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/MaxFileMergeFileSelector.java
rename to server/src/main/java/org/apache/iotdb/db/engine/merge/BaseFileSelector.java
index a060634..97e4551 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/MaxFileMergeFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/BaseFileSelector.java
@@ -17,19 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge.inplace.selector;
+package org.apache.iotdb.db.engine.merge;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.merge.IFileQueryMemMeasurement;
-import org.apache.iotdb.db.engine.merge.IMergeFileSelector;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
@@ -37,24 +32,19 @@ import org.apache.iotdb.db.utils.MergeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * MaxFileMergeFileSelector selects the most files from given seqFiles and unseqFiles which can be
- * merged without exceeding given memory budget. It always assume the number of timeseries being
- * queried at the same time is 1 to maximize the number of file merged.
- */
-public class MaxFileMergeFileSelector implements IMergeFileSelector {
+public abstract class BaseFileSelector implements IMergeFileSelector{
- private static final Logger logger = LoggerFactory.getLogger(MaxFileMergeFileSelector.class);
+ private static final Logger logger = LoggerFactory.getLogger(BaseFileSelector.class);
private static final String LOG_FILE_COST = "Memory cost of file {} is {}";
- MergeResource resource;
-
- long totalCost;
- private long memoryBudget;
- private long maxSeqFileCost;
+ protected MergeResource resource;
+ protected long totalCost;
+ protected long memoryBudget;
// the number of timeseries being queried at the same time
- int concurrentMergeNum = 1;
+ protected int concurrentMergeNum = 1;
+ protected long tempMaxSeqFileCost;
+ protected long maxSeqFileCost;
/**
* Total metadata size of each file.
@@ -65,47 +55,15 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
*/
private Map<TsFileResource, Long> maxSeriesQueryCostMap = new HashMap<>();
- List<TsFileResource> selectedUnseqFiles;
- List<TsFileResource> selectedSeqFiles;
-
- private Collection<Integer> tmpSelectedSeqFiles;
- private long tempMaxSeqFileCost;
+ protected List<TsFileResource> selectedUnseqFiles;
+ protected List<TsFileResource> selectedSeqFiles;
- private boolean[] seqSelected;
- private int seqSelectedNum;
+ protected int seqSelectedNum;
- public MaxFileMergeFileSelector(MergeResource resource, long memoryBudget) {
- this.resource = resource;
- this.memoryBudget = memoryBudget;
- }
+ protected TmpSelectedSeqIterable tmpSelectedSeqIterable;
- /**
- * Select merge candidates from seqFiles and unseqFiles under the given memoryBudget.
- * This process iteratively adds the next unseqFile from unseqFiles and its overlapping seqFiles
- * as newly-added candidates and computes their estimated memory cost. If the current cost
- * pluses the new cost is still under the budget, accept the unseqFile and the seqFiles as
- * candidates, otherwise go to the next iteration.
- * The memory cost of a file is calculated in two ways:
- * The rough estimation: for a seqFile, the size of its metadata is used for estimation.
- * Since in the worst case, the file only contains one timeseries and all its metadata will
- * be loaded into memory with at most one actual data chunk (which is negligible) and writing
- * the timeseries into a new file generate metadata of the similar size, so the size of all
- * seqFiles' metadata (generated when writing new chunks) pluses the largest one (loaded
- * when reading a timeseries from the seqFiles) is the total estimation of all seqFiles; for
- * an unseqFile, since the merge reader may read all chunks of a series to perform a merge
- * read, the whole file may be loaded into memory, so we use the file's length as the
- * maximum estimation.
- * The tight estimation: based on the rough estimation, we scan the file's metadata to
- * count the number of chunks for each series, find the series which have the most
- * chunks in the file and use its chunk proportion to refine the rough estimation.
- * The rough estimation is performed first, if no candidates can be found using rough
- * estimation, we run the selection again with tight estimation.
- * @return two lists of TsFileResource, the former is selected seqFiles and the latter is
- * selected unseqFiles or an empty array if there are no proper candidates by the budget.
- * @throws MergeException
- */
@Override
- public List[] select() throws MergeException {
+ public void select() throws MergeException {
long startTime = System.currentTimeMillis();
try {
logger.info("Selecting merge candidates from {} seqFile, {} unseqFiles",
@@ -117,9 +75,9 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
resource.setSeqFiles(selectedSeqFiles);
resource.setUnseqFiles(selectedUnseqFiles);
resource.removeOutdatedSeqReaders();
- if (selectedUnseqFiles.isEmpty()) {
+ if (selectedUnseqFiles.isEmpty() && selectedSeqFiles.isEmpty()) {
logger.info("No merge candidates are found");
- return new List[0];
+ return;
}
} catch (IOException e) {
throw new MergeException(e);
@@ -130,16 +88,13 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
selectedSeqFiles.size(), selectedUnseqFiles.size(), totalCost,
System.currentTimeMillis() - startTime);
}
- return new List[]{selectedSeqFiles, selectedUnseqFiles};
}
- void select(boolean useTightBound) throws IOException {
- tmpSelectedSeqFiles = new HashSet<>();
- seqSelected = new boolean[resource.getSeqFiles().size()];
+ public void select(boolean useTightBound) throws IOException {
seqSelectedNum = 0;
selectedSeqFiles = new ArrayList<>();
selectedUnseqFiles = new ArrayList<>();
- maxSeqFileCost = 0;
+ long maxSeqFileCost = 0;
tempMaxSeqFileCost = 0;
totalCost = 0;
@@ -158,82 +113,81 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
selectOverlappedSeqFiles(unseqFile);
tempMaxSeqFileCost = maxSeqFileCost;
- long newCost = useTightBound ? calculateTightMemoryCost(unseqFile, tmpSelectedSeqFiles,
- startTime, timeLimit) :
- calculateLooseMemoryCost(unseqFile, tmpSelectedSeqFiles, startTime, timeLimit);
-
- if (totalCost + newCost < memoryBudget) {
- selectedUnseqFiles.add(unseqFile);
- maxSeqFileCost = tempMaxSeqFileCost;
-
- for (Integer seqIdx : tmpSelectedSeqFiles) {
- seqSelected[seqIdx] = true;
- seqSelectedNum++;
- selectedSeqFiles.add(resource.getSeqFiles().get(seqIdx));
- }
- totalCost += newCost;
- logger.debug("Adding a new unseqFile {} and seqFiles {} as candidates, new cost {}, total"
- + " cost {}",
- unseqFile, tmpSelectedSeqFiles, newCost, totalCost);
- }
- tmpSelectedSeqFiles.clear();
+ long newCost = useTightBound ? calculateTightMemoryCost(unseqFile, startTime,
+ timeLimit) :
+ calculateLooseMemoryCost(unseqFile, startTime, timeLimit);
+ updateCost(newCost, unseqFile);
+
unseqIndex++;
timeConsumption = System.currentTimeMillis() - startTime;
}
}
- private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
- if (seqSelectedNum == resource.getSeqFiles().size()) {
- return;
+ protected abstract void selectOverlappedSeqFiles(TsFileResource unseqFile);
+
+ protected abstract void updateCost(long newCost, TsFileResource unseqFile);
+
+ private long calculateMetadataSize(TsFileResource seqFile) throws IOException {
+ Long cost = fileMetaSizeMap.get(seqFile);
+ if (cost == null) {
+ cost = MergeUtils.getFileMetaSize(seqFile, resource.getFileReader(seqFile));
+ fileMetaSizeMap.put(seqFile, cost);
+ logger.debug(LOG_FILE_COST, seqFile, cost);
}
- int tmpSelectedNum = 0;
- for (Entry<String, Long> deviceStartTimeEntry : unseqFile.getStartTimeMap().entrySet()) {
- String deviceId = deviceStartTimeEntry.getKey();
- Long unseqStartTime = deviceStartTimeEntry.getValue();
- Long unseqEndTime = unseqFile.getEndTimeMap().get(deviceId);
-
- boolean noMoreOverlap = false;
- for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) {
- TsFileResource seqFile = resource.getSeqFiles().get(i);
- if (seqSelected[i] || !seqFile.getEndTimeMap().containsKey(deviceId)) {
- continue;
- }
- Long seqEndTime = seqFile.getEndTimeMap().get(deviceId);
- if (unseqEndTime <= seqEndTime) {
- // the unseqFile overlaps current seqFile
- tmpSelectedSeqFiles.add(i);
- tmpSelectedNum ++;
- // the device of the unseqFile can not merge with later seqFiles
- noMoreOverlap = true;
- } else if (unseqStartTime <= seqEndTime) {
- // the device of the unseqFile may merge with later seqFiles
- // and the unseqFile overlaps current seqFile
- tmpSelectedSeqFiles.add(i);
- tmpSelectedNum++;
- }
- }
- if (tmpSelectedNum + seqSelectedNum == resource.getSeqFiles().size()) {
- break;
- }
+ return cost;
+ }
+
+ private long calculateTightFileMemoryCost(TsFileResource seqFile,
+ IFileQueryMemMeasurement measurement)
+ throws IOException {
+ Long cost = maxSeriesQueryCostMap.get(seqFile);
+ if (cost == null) {
+ long[] chunkNums = MergeUtils.findTotalAndLargestSeriesChunkNum(seqFile, resource.getFileReader(seqFile));
+ long totalChunkNum = chunkNums[0];
+ long maxChunkNum = chunkNums[1];
+ cost = measurement.measure(seqFile) * maxChunkNum / totalChunkNum;
+ maxSeriesQueryCostMap.put(seqFile, cost);
+ logger.debug(LOG_FILE_COST, seqFile, cost);
}
+ return cost;
+ }
+
+ // this method traverses all ChunkMetadata to find out which series has the most chunks and uses
+ // its proportion to all series to get a maximum estimation
+ private long calculateTightSeqMemoryCost(TsFileResource seqFile) throws IOException {
+ long singleSeriesCost = calculateTightFileMemoryCost(seqFile, this::calculateMetadataSize);
+ long multiSeriesCost = concurrentMergeNum * singleSeriesCost;
+ long maxCost = calculateMetadataSize(seqFile);
+ return Math.min(multiSeriesCost, maxCost);
+ }
+
+ // this method traverses all ChunkMetadata to find out which series has the most chunks and uses
+ // its proportion among all series to get a maximum estimation
+ private long calculateTightUnseqMemoryCost(TsFileResource unseqFile) throws IOException {
+ long singleSeriesCost = calculateTightFileMemoryCost(unseqFile, TsFileResource::getFileSize);
+ long multiSeriesCost = concurrentMergeNum * singleSeriesCost;
+ long maxCost = unseqFile.getFileSize();
+ return Math.min(multiSeriesCost, maxCost);
}
private long calculateMemoryCost(TsFileResource tmpSelectedUnseqFile,
- Collection<Integer> tmpSelectedSeqFiles, IFileQueryMemMeasurement unseqMeasurement,
+ IFileQueryMemMeasurement unseqMeasurement,
IFileQueryMemMeasurement seqMeasurement, long startTime, long timeLimit) throws IOException {
long cost = 0;
Long fileCost = unseqMeasurement.measure(tmpSelectedUnseqFile);
cost += fileCost;
- for (Integer seqFileIdx : tmpSelectedSeqFiles) {
+ for (Integer seqFileIdx : tmpSelectedSeqIterable) {
TsFileResource seqFile = resource.getSeqFiles().get(seqFileIdx);
fileCost = seqMeasurement.measure(seqFile);
if (fileCost > tempMaxSeqFileCost) {
+ // memory used when read data from a seq file:
// only one file will be read at the same time, so only the largest one is recorded here
cost -= tempMaxSeqFileCost;
cost += fileCost;
tempMaxSeqFileCost = fileCost;
}
+ // memory used to cache the metadata before the new file is closed
// but writing data into a new file may generate the same amount of metadata in memory
cost += calculateMetadataSize(seqFile);
long timeConsumption = System.currentTimeMillis() - startTime;
@@ -245,61 +199,47 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
}
private long calculateLooseMemoryCost(TsFileResource tmpSelectedUnseqFile,
- Collection<Integer> tmpSelectedSeqFiles, long startTime, long timeLimit) throws IOException {
- return calculateMemoryCost(tmpSelectedUnseqFile, tmpSelectedSeqFiles,
+ long startTime, long timeLimit) throws IOException {
+ return calculateMemoryCost(tmpSelectedUnseqFile,
TsFileResource::getFileSize, this::calculateMetadataSize, startTime, timeLimit);
}
private long calculateTightMemoryCost(TsFileResource tmpSelectedUnseqFile,
- Collection<Integer> tmpSelectedSeqFiles, long startTime, long timeLimit) throws IOException {
- return calculateMemoryCost(tmpSelectedUnseqFile, tmpSelectedSeqFiles,
+ long startTime, long timeLimit) throws IOException {
+ return calculateMemoryCost(tmpSelectedUnseqFile,
this::calculateTightUnseqMemoryCost, this::calculateTightSeqMemoryCost, startTime, timeLimit);
}
- private long calculateMetadataSize(TsFileResource seqFile) throws IOException {
- Long cost = fileMetaSizeMap.get(seqFile);
- if (cost == null) {
- cost = MergeUtils.getFileMetaSize(seqFile, resource.getFileReader(seqFile));
- fileMetaSizeMap.put(seqFile, cost);
- logger.debug(LOG_FILE_COST, seqFile, cost);
- }
- return cost;
+ @Override
+ public void setConcurrentMergeNum(int concurrentMergeNum) {
+ this.concurrentMergeNum = concurrentMergeNum;
}
- private long calculateTightFileMemoryCost(TsFileResource seqFile, IFileQueryMemMeasurement measurement)
- throws IOException {
- Long cost = maxSeriesQueryCostMap.get(seqFile);
- if (cost == null) {
- long[] chunkNums = MergeUtils.findTotalAndLargestSeriesChunkNum(seqFile, resource.getFileReader(seqFile));
- long totalChunkNum = chunkNums[0];
- long maxChunkNum = chunkNums[1];
- cost = measurement.measure(seqFile) * maxChunkNum / totalChunkNum;
- maxSeriesQueryCostMap.put(seqFile, cost);
- logger.debug(LOG_FILE_COST, seqFile, cost);
- }
- return cost;
+ @Override
+ public MergeResource getResource() {
+ return resource;
}
- // this method traverses all ChunkMetadata to find out which series has the most chunks and uses
- // its proportion to all series to get a maximum estimation
- private long calculateTightSeqMemoryCost(TsFileResource seqFile) throws IOException {
- long singleSeriesCost = calculateTightFileMemoryCost(seqFile, this::calculateMetadataSize);
- long multiSeriesCost = concurrentMergeNum * singleSeriesCost;
- long maxCost = calculateMetadataSize(seqFile);
- return multiSeriesCost > maxCost ? maxCost : multiSeriesCost;
+ @Override
+ public List<TsFileResource> getSelectedSeqFiles() {
+ return selectedSeqFiles;
}
- // this method traverses all ChunkMetadata to find out which series has the most chunks and uses
- // its proportion among all series to get a maximum estimation
- private long calculateTightUnseqMemoryCost(TsFileResource unseqFile) throws IOException {
- long singleSeriesCost = calculateTightFileMemoryCost(unseqFile, TsFileResource::getFileSize);
- long multiSeriesCost = concurrentMergeNum * singleSeriesCost;
- long maxCost = unseqFile.getFileSize();
- return multiSeriesCost > maxCost ? maxCost : multiSeriesCost;
+ @Override
+ public List<TsFileResource> getSelectedUnseqFiles() {
+ return selectedUnseqFiles;
+ }
+
+ @Override
+ public long getTotalCost() {
+ return totalCost;
}
@Override
public int getConcurrentMergeNum() {
return concurrentMergeNum;
}
-}
+
+ protected abstract static class TmpSelectedSeqIterable implements Iterable<Integer> {
+ }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/IMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/IMergeFileSelector.java
index e85c4df..f260095 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/IMergeFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/IMergeFileSelector.java
@@ -19,7 +19,10 @@
package org.apache.iotdb.db.engine.merge;
+import java.io.IOException;
import java.util.List;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
/**
@@ -28,7 +31,44 @@ import org.apache.iotdb.db.exception.MergeException;
*/
public interface IMergeFileSelector {
- List[] select() throws MergeException;
+ /**
+ * Select merge candidates from seqFiles and unseqFiles under the given memoryBudget.
+ * This process iteratively adds the next unseqFile from unseqFiles and its overlapping seqFiles
+ * as newly-added candidates and computes their estimated memory cost. If the current cost
+ * pluses the new cost is still under the budget, accept the unseqFile and the seqFiles as
+ * candidates, otherwise go to the next iteration.
+ * The memory cost of a file is calculated in two ways:
+ * The rough estimation: for a seqFile, the size of its metadata is used for estimation.
+ * Since in the worst case, the file only contains one timeseries and all its metadata will
+ * be loaded into memory with at most one actual data chunk (which is negligible) and writing
+ * the timeseries into a new file generate metadata of the similar size, so the size of all
+ * seqFiles' metadata (generated when writing new chunks) pluses the largest one (loaded
+ * when reading a timeseries from the seqFiles) is the total estimation of all seqFiles; for
+ * an unseqFile, since the merge reader may read all chunks of a series to perform a merge
+ * read, the whole file may be loaded into memory, so we use the file's length as the
+ * maximum estimation.
+ * The tight estimation: based on the rough estimation, we scan the file's metadata to
+ * count the number of chunks for each series, find the series which have the most
+ * chunks in the file and use its chunk proportion to refine the rough estimation.
+ * The rough estimation is performed first, if no candidates can be found using rough
+ * estimation, we run the selection again with tight estimation.
+ * @return two lists of TsFileResource, the former is selected seqFiles and the latter is
+ * selected unseqFiles or an empty array if there are no proper candidates by the budget.
+ * @throws MergeException
+ */
+ void select() throws MergeException;
+
+ void select(boolean useTightBound) throws MergeException, IOException;
int getConcurrentMergeNum();
+
+ void setConcurrentMergeNum(int concurrentMergeNum);
+
+ MergeResource getResource();
+
+ List<TsFileResource> getSelectedSeqFiles();
+
+ List<TsFileResource> getSelectedUnseqFiles();
+
+ long getTotalCost();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/MergeFileStrategy.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/IRecoverMergeTask.java
similarity index 76%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/MergeFileStrategy.java
rename to server/src/main/java/org/apache/iotdb/db/engine/merge/IRecoverMergeTask.java
index 637ff72..1ae7e21 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/MergeFileStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/IRecoverMergeTask.java
@@ -17,11 +17,11 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge.inplace.selector;
+package org.apache.iotdb.db.engine.merge;
-public enum MergeFileStrategy {
- MAX_SERIES_NUM,
- MAX_FILE_NUM,
- // TODO: HOW?
- TRADE_OFF,
+import java.io.IOException;
+import org.apache.iotdb.db.exception.MetadataErrorException;
+
+public interface IRecoverMergeTask {
+ void recoverMerge(boolean continueMerge) throws IOException, MetadataErrorException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/MaxSeriesMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/MaxSeriesMergeFileSelector.java
similarity index 54%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/MaxSeriesMergeFileSelector.java
rename to server/src/main/java/org/apache/iotdb/db/engine/merge/MaxSeriesMergeFileSelector.java
index 621bdd6..7278d09 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/MaxSeriesMergeFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/MaxSeriesMergeFileSelector.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge.squeeze.selector;
+package org.apache.iotdb.db.engine.merge;
import java.io.IOException;
import java.util.Collections;
@@ -32,9 +32,12 @@ import org.slf4j.LoggerFactory;
* MaxSeriesMergeFileSelector is an extension of IMergeFileSelector which tries to maximize the
* number of timeseries that can be merged at the same time.
*/
-public class MaxSeriesMergeFileSelector extends MaxFileMergeFileSelector {
+public class MaxSeriesMergeFileSelector<T extends IMergeFileSelector> implements IMergeFileSelector {
- public static final int MAX_SERIES_NUM = 1024;
+ private T baseSelector;
+ private MergeResource resource;
+
+ public static final int MAX_SERIES_NUM = 10240;
private static final Logger logger = LoggerFactory.getLogger(
MaxSeriesMergeFileSelector.class);
@@ -42,26 +45,32 @@ public class MaxSeriesMergeFileSelector extends MaxFileMergeFileSelector {
private List<TsFileResource> lastSelectedUnseqFiles = Collections.emptyList();
private long lastTotalMemoryCost;
- public MaxSeriesMergeFileSelector(
- MergeResource mergeResource,
- long memoryBudget) {
- super(mergeResource, memoryBudget);
+ private int concurrentMergeNum;
+ private long totalCost;
+
+
+ public MaxSeriesMergeFileSelector(T baseSelector) {
+ this.baseSelector = baseSelector;
+ this.resource = baseSelector.getResource();
}
@Override
- public List[] select() throws MergeException {
+ public void select() throws MergeException {
long startTime = System.currentTimeMillis();
try {
- logger.info("Selecting merge candidates from {} seqFile, {} unseqFiles", resource.getSeqFiles().size(),
+ logger.info("Selecting merge candidates from {} seqFile, {} unseqFiles",
+ resource.getSeqFiles().size(),
resource.getUnseqFiles().size());
searchMaxSeriesNum();
+ List<TsFileResource> selectedSeqFiles = baseSelector.getSelectedSeqFiles();
+ List<TsFileResource> selectedUnseqFiles = baseSelector.getSelectedUnseqFiles();
resource.setSeqFiles(selectedSeqFiles);
resource.setUnseqFiles(selectedUnseqFiles);
resource.removeOutdatedSeqReaders();
- if (selectedUnseqFiles.isEmpty()) {
+ if (selectedUnseqFiles.isEmpty() && selectedSeqFiles.isEmpty()) {
logger.info("No merge candidates are found");
- return new List[0];
+ return;
}
} catch (IOException e) {
throw new MergeException(e);
@@ -69,17 +78,52 @@ public class MaxSeriesMergeFileSelector extends MaxFileMergeFileSelector {
if (logger.isInfoEnabled()) {
logger.info("Selected merge candidates, {} seqFiles, {} unseqFiles, total memory cost {}, "
+ "concurrent merge num {}" + "time consumption {}ms",
- selectedSeqFiles.size(), selectedUnseqFiles.size(), totalCost, concurrentMergeNum,
+ resource.getSeqFiles().size(), resource.getUnseqFiles().size(), baseSelector.getTotalCost(),
+ baseSelector.getConcurrentMergeNum(),
System.currentTimeMillis() - startTime);
}
- return new List[]{selectedSeqFiles, selectedUnseqFiles};
}
- private void searchMaxSeriesNum() throws IOException {
+ @Override
+ public void select(boolean useTightBound) throws MergeException {
+ select();
+ }
+
+ @Override
+ public int getConcurrentMergeNum() {
+ return concurrentMergeNum;
+ }
+
+ @Override
+ public void setConcurrentMergeNum(int concurrentMergeNum) {
+
+ }
+
+ @Override
+ public MergeResource getResource() {
+ return resource;
+ }
+
+ @Override
+ public List<TsFileResource> getSelectedSeqFiles() {
+ return lastSelectedSeqFiles;
+ }
+
+ @Override
+ public List<TsFileResource> getSelectedUnseqFiles() {
+ return lastSelectedUnseqFiles;
+ }
+
+ @Override
+ public long getTotalCost() {
+ return totalCost;
+ }
+
+ private void searchMaxSeriesNum() throws IOException, MergeException {
binSearch();
}
- private void binSearch() throws IOException {
+ private void binSearch() throws IOException, MergeException {
int lb = 0;
int ub = MAX_SERIES_NUM + 1;
while (true) {
@@ -87,22 +131,20 @@ public class MaxSeriesMergeFileSelector extends MaxFileMergeFileSelector {
if (mid == lb) {
break;
}
- concurrentMergeNum = mid;
- select(false);
- if (selectedUnseqFiles.isEmpty()) {
- select(true);
+ baseSelector.setConcurrentMergeNum(mid);
+ baseSelector.select(false);
+ if (baseSelector.getSelectedUnseqFiles().isEmpty() && baseSelector.getSelectedSeqFiles().isEmpty()) {
+ baseSelector.select(true);
}
- if (selectedUnseqFiles.isEmpty()) {
+ if (baseSelector.getSelectedUnseqFiles().isEmpty() && baseSelector.getSelectedSeqFiles().isEmpty()) {
ub = mid;
} else {
- lastSelectedSeqFiles = selectedSeqFiles;
- lastSelectedUnseqFiles = selectedUnseqFiles;
- lastTotalMemoryCost = totalCost;
+ lastSelectedSeqFiles = baseSelector.getSelectedSeqFiles();
+ lastSelectedUnseqFiles = baseSelector.getSelectedUnseqFiles();
+ lastTotalMemoryCost = baseSelector.getTotalCost();
lb = mid;
}
}
- selectedUnseqFiles = lastSelectedUnseqFiles;
- selectedSeqFiles = lastSelectedSeqFiles;
concurrentMergeNum = lb;
totalCost = lastTotalMemoryCost;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/MergeFileStrategy.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/MergeFileStrategy.java
new file mode 100644
index 0000000..a6621c5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/MergeFileStrategy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import java.util.concurrent.Callable;
+import org.apache.iotdb.db.engine.merge.IMergeFileSelector;
+import org.apache.iotdb.db.engine.merge.inplace.selector.InplaceMaxFileSelector;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.squeeze.selector.SqueezeMaxFileSelector;
+import org.apache.iotdb.db.engine.merge.squeeze.task.SqueezeMergeTask;
+
+public enum MergeFileStrategy {
+ INPLACE_MAX_SERIES_NUM,
+ INPLACE_MAX_FILE_NUM,
+ SQUEEZE_MAX_SERIES_NUM,
+ SQUEEZE_MAX_FILE_NUM;
+ // TODO new strategies?
+
+ public IMergeFileSelector getFileSelector(MergeResource resource, long budget) {
+ switch (this) {
+ case INPLACE_MAX_FILE_NUM:
+ return new InplaceMaxFileSelector(resource, budget);
+ case SQUEEZE_MAX_FILE_NUM:
+ return new SqueezeMaxFileSelector(resource, budget);
+ case INPLACE_MAX_SERIES_NUM:
+ return new MaxSeriesMergeFileSelector<>(new InplaceMaxFileSelector(resource, budget));
+ case SQUEEZE_MAX_SERIES_NUM:
+ return new MaxSeriesMergeFileSelector<>(new SqueezeMaxFileSelector(resource, budget));
+ }
+ return null;
+ }
+
+ public Callable<Void> getMergeTask(MergeResource mergeResource, String storageGroupSysDir,
+ MergeCallback callback,
+ String taskName, int concurrentMergeSeriesNum, String storageGroupName,
+ boolean isFullMerge) {
+ switch (this) {
+ case SQUEEZE_MAX_SERIES_NUM:
+ case SQUEEZE_MAX_FILE_NUM:
+ return new SqueezeMergeTask(mergeResource, storageGroupSysDir, callback, taskName,
+ concurrentMergeSeriesNum, storageGroupName);
+ case INPLACE_MAX_SERIES_NUM:
+ case INPLACE_MAX_FILE_NUM:
+ return new InplaceMergeTask(mergeResource, storageGroupSysDir, callback, taskName,
+ isFullMerge, concurrentMergeSeriesNum, storageGroupName);
+ }
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/NaivePathSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/NaivePathSelector.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/NaivePathSelector.java
rename to server/src/main/java/org/apache/iotdb/db/engine/merge/NaivePathSelector.java
index 6fa9bca..6eb9a02 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/NaivePathSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/NaivePathSelector.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge.squeeze.selector;
+package org.apache.iotdb.db.engine.merge;
import java.util.List;
import java.util.NoSuchElementException;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/MergeLogger.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/InplaceMergeLogger.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/MergeLogger.java
rename to server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/InplaceMergeLogger.java
index d4a6b51..5b3d807 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/MergeLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/InplaceMergeLogger.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
/**
* MergeLogger records the progress of a merge in file "merge.log" as text lines.
*/
-public class MergeLogger {
+public class InplaceMergeLogger {
public static final String MERGE_LOG_NAME = "merge.log.inplace";
@@ -47,7 +47,7 @@ public class MergeLogger {
private BufferedWriter logStream;
- public MergeLogger(String storageGroupDir) throws IOException {
+ public InplaceMergeLogger(String storageGroupDir) throws IOException {
logStream = new BufferedWriter(new FileWriter(SystemFileFactory.INSTANCE.getFile(storageGroupDir,
MERGE_LOG_NAME), true));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java
index 73ba34e..9f3a44b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.engine.merge.inplace.recover;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.*;
+import static org.apache.iotdb.db.engine.merge.inplace.recover.InplaceMergeLogger.*;
import java.io.BufferedReader;
import java.io.File;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/InplaceMaxFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/InplaceMaxFileSelector.java
new file mode 100644
index 0000000..7e80490
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/InplaceMaxFileSelector.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.inplace.selector;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import org.apache.iotdb.db.engine.merge.BaseFileSelector;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MaxFileMergeFileSelector selects the most files from given seqFiles and unseqFiles which can be
+ * merged without exceeding given memory budget. It always assume the number of timeseries being
+ * queried at the same time is 1 to maximize the number of file merged.
+ */
+public class InplaceMaxFileSelector extends BaseFileSelector {
+
+ private static final Logger logger = LoggerFactory.getLogger(InplaceMaxFileSelector.class);
+
+ private Collection<Integer> tmpSelectedSeqFiles;
+
+ private boolean[] seqSelected;
+
+ public InplaceMaxFileSelector(MergeResource resource, long memoryBudget) {
+ this.resource = resource;
+ this.memoryBudget = memoryBudget;
+ this.tmpSelectedSeqIterable = new ListTmpSeqIter();
+ }
+
+ @Override
+ public void select(boolean useTightBound) throws IOException {
+ tmpSelectedSeqFiles = new HashSet<>();
+ seqSelected = new boolean[resource.getSeqFiles().size()];
+ super.select(useTightBound);
+ }
+
+ @Override
+ protected void updateCost(long newCost, TsFileResource unseqFile) {
+ if (totalCost + newCost < memoryBudget) {
+ selectedUnseqFiles.add(unseqFile);
+ maxSeqFileCost = tempMaxSeqFileCost;
+
+ for (Integer seqIdx : tmpSelectedSeqFiles) {
+ seqSelected[seqIdx] = true;
+ seqSelectedNum++;
+ selectedSeqFiles.add(resource.getSeqFiles().get(seqIdx));
+ }
+ totalCost += newCost;
+ logger.debug("Adding a new unseqFile {} and seqFiles {} as candidates, new cost {}, total"
+ + " cost {}",
+ unseqFile, tmpSelectedSeqFiles, newCost, totalCost);
+ }
+ tmpSelectedSeqFiles.clear();
+ }
+
+ protected void selectOverlappedSeqFiles(TsFileResource unseqFile) {
+ if (seqSelectedNum == resource.getSeqFiles().size()) {
+ return;
+ }
+ int tmpSelectedNum = 0;
+ for (Entry<String, Long> deviceStartTimeEntry : unseqFile.getStartTimeMap().entrySet()) {
+ String deviceId = deviceStartTimeEntry.getKey();
+ Long unseqStartTime = deviceStartTimeEntry.getValue();
+ Long unseqEndTime = unseqFile.getEndTimeMap().get(deviceId);
+
+ boolean noMoreOverlap = false;
+ for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) {
+ TsFileResource seqFile = resource.getSeqFiles().get(i);
+ if (seqSelected[i] || !seqFile.getEndTimeMap().containsKey(deviceId)) {
+ continue;
+ }
+ Long seqEndTime = seqFile.getEndTimeMap().get(deviceId);
+ if (unseqEndTime <= seqEndTime) {
+ // the unseqFile overlaps current seqFile
+ if (!tmpSelectedSeqFiles.contains(i)) {
+ tmpSelectedSeqFiles.add(i);
+ tmpSelectedNum ++;
+ }
+ // the device of the unseqFile can not merge with later seqFiles
+ noMoreOverlap = true;
+ } else if (unseqStartTime <= seqEndTime) {
+ // the device of the unseqFile may merge with later seqFiles
+ // and the unseqFile overlaps current seqFile
+ if (!tmpSelectedSeqFiles.contains(i)) {
+ tmpSelectedSeqFiles.add(i);
+ tmpSelectedNum ++;
+ }
+ }
+ }
+ if (tmpSelectedNum + seqSelectedNum == resource.getSeqFiles().size()) {
+ break;
+ }
+ }
+ }
+
+
+
+ protected class ListTmpSeqIter extends TmpSelectedSeqIterable {
+
+ @Override
+ public Iterator<Integer> iterator() {
+ return tmpSelectedSeqFiles.iterator();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/MaxSeriesMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/MaxSeriesMergeFileSelector.java
deleted file mode 100644
index 3b710f2..0000000
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/MaxSeriesMergeFileSelector.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.merge.inplace.selector;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.MergeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * MaxSeriesMergeFileSelector is an extension of IMergeFileSelector which tries to maximize the
- * number of timeseries that can be merged at the same time.
- */
-public class MaxSeriesMergeFileSelector extends MaxFileMergeFileSelector {
-
- public static final int MAX_SERIES_NUM = 1024;
- private static final Logger logger = LoggerFactory.getLogger(MaxSeriesMergeFileSelector.class);
-
- private List<TsFileResource> lastSelectedSeqFiles = Collections.emptyList();
- private List<TsFileResource> lastSelectedUnseqFiles = Collections.emptyList();
- private long lastTotalMemoryCost;
-
- public MaxSeriesMergeFileSelector(
- MergeResource mergeResource,
- long memoryBudget) {
- super(mergeResource, memoryBudget);
- }
-
- @Override
- public List[] select() throws MergeException {
- long startTime = System.currentTimeMillis();
- try {
- logger.info("Selecting merge candidates from {} seqFile, {} unseqFiles", resource.getSeqFiles().size(),
- resource.getUnseqFiles().size());
-
- searchMaxSeriesNum();
- resource.setSeqFiles(selectedSeqFiles);
- resource.setUnseqFiles(selectedUnseqFiles);
- resource.removeOutdatedSeqReaders();
- if (selectedUnseqFiles.isEmpty()) {
- logger.info("No merge candidates are found");
- return new List[0];
- }
- } catch (IOException e) {
- throw new MergeException(e);
- }
- if (logger.isInfoEnabled()) {
- logger.info("Selected merge candidates, {} seqFiles, {} unseqFiles, total memory cost {}, "
- + "concurrent merge num {}" + "time consumption {}ms",
- selectedSeqFiles.size(), selectedUnseqFiles.size(), totalCost, concurrentMergeNum,
- System.currentTimeMillis() - startTime);
- }
- return new List[]{selectedSeqFiles, selectedUnseqFiles};
- }
-
- private void searchMaxSeriesNum() throws IOException {
- binSearch();
- }
-
- private void binSearch() throws IOException {
- int lb = 0;
- int ub = MAX_SERIES_NUM + 1;
- while (true) {
- int mid = (lb + ub) / 2;
- if (mid == lb) {
- break;
- }
- concurrentMergeNum = mid;
- select(false);
- if (selectedUnseqFiles.isEmpty()) {
- select(true);
- }
- if (selectedUnseqFiles.isEmpty()) {
- ub = mid;
- } else {
- lastSelectedSeqFiles = selectedSeqFiles;
- lastSelectedUnseqFiles = selectedUnseqFiles;
- lastTotalMemoryCost = totalCost;
- lb = mid;
- }
- }
- selectedUnseqFiles = lastSelectedUnseqFiles;
- selectedSeqFiles = lastSelectedSeqFiles;
- concurrentMergeNum = lb;
- totalCost = lastTotalMemoryCost;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/NaivePathSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/NaivePathSelector.java
deleted file mode 100644
index 161e4a4..0000000
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/selector/NaivePathSelector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.merge.inplace.selector;
-
-import java.util.List;
-import java.util.NoSuchElementException;
-import org.apache.iotdb.db.engine.merge.IMergePathSelector;
-import org.apache.iotdb.tsfile.read.common.Path;
-
-public class NaivePathSelector implements IMergePathSelector {
-
- private List<Path> paths;
- private int idx;
- private int maxSeriesNum;
-
- public NaivePathSelector(List<Path> paths, int maxSeriesNum) {
- this.paths = paths;
- this.maxSeriesNum = maxSeriesNum;
- }
-
- @Override
- public boolean hasNext() {
- return idx < paths.size();
- }
-
- @Override
- public List<Path> next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- List<Path> ret = idx + maxSeriesNum <= paths.size() ? paths.subList(idx, idx + maxSeriesNum) :
- paths.subList(idx, paths.size());
- idx += maxSeriesNum;
- return ret;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/InplaceMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/InplaceMergeTask.java
index 11ad188..8f747e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/InplaceMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/InplaceMergeTask.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.merge.MergeCallback;
import org.apache.iotdb.db.engine.merge.manage.MergeContext;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger;
+import org.apache.iotdb.db.engine.merge.inplace.recover.InplaceMergeLogger;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.metadata.MManager;
@@ -55,7 +55,7 @@ public class InplaceMergeTask implements Callable<Void> {
MergeResource resource;
String storageGroupSysDir;
String storageGroupName;
- MergeLogger mergeLogger;
+ InplaceMergeLogger mergeLogger;
MergeContext mergeContext = new MergeContext();
private MergeCallback callback;
@@ -96,7 +96,7 @@ public class InplaceMergeTask implements Callable<Void> {
// call the callback to make sure the StorageGroup exit merging status, but passing 2
// empty file lists to avoid files being deleted.
callback.call(Collections.emptyList(), Collections.emptyList(),
- SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME), null);
+ SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, InplaceMergeLogger.MERGE_LOG_NAME), null);
throw e;
}
return null;
@@ -110,7 +110,7 @@ public class InplaceMergeTask implements Callable<Void> {
long startTime = System.currentTimeMillis();
long totalFileSize = MergeUtils.collectFileSizes(resource.getSeqFiles(),
resource.getUnseqFiles());
- mergeLogger = new MergeLogger(storageGroupSysDir);
+ mergeLogger = new InplaceMergeLogger(storageGroupSysDir);
mergeLogger.logFiles(resource);
@@ -165,7 +165,7 @@ public class InplaceMergeTask implements Callable<Void> {
mergeFile.delete();
}
- File logFile = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME);
+ File logFile = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, InplaceMergeLogger.MERGE_LOG_NAME);
if (executeCallback) {
// make sure merge.log is not deleted until unseqFiles are cleared so that when system
// reboots, the undeleted files can be deleted again
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeFileTask.java
index cb74a32..40d186b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeFileTask.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
import org.apache.iotdb.db.engine.merge.manage.MergeContext;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger;
+import org.apache.iotdb.db.engine.merge.inplace.recover.InplaceMergeLogger;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
@@ -54,11 +54,11 @@ class MergeFileTask {
private String taskName;
private MergeContext context;
- private MergeLogger mergeLogger;
+ private InplaceMergeLogger mergeLogger;
private MergeResource resource;
private List<TsFileResource> unmergedFiles;
- MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
+ MergeFileTask(String taskName, MergeContext context, InplaceMergeLogger mergeLogger,
MergeResource resource, List<TsFileResource> unmergedSeqFiles) {
this.taskName = taskName;
this.context = context;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeMultiChunkTask.java
index 992e5ba..1d6b222 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeMultiChunkTask.java
@@ -35,9 +35,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.merge.manage.MergeContext;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger;
+import org.apache.iotdb.db.engine.merge.inplace.recover.InplaceMergeLogger;
import org.apache.iotdb.db.engine.merge.IMergePathSelector;
-import org.apache.iotdb.db.engine.merge.inplace.selector.NaivePathSelector;
+import org.apache.iotdb.db.engine.merge.NaivePathSelector;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.reader.IPointReader;
@@ -64,7 +64,7 @@ class MergeMultiChunkTask {
private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig()
.getChunkMergePointThreshold();
- private MergeLogger mergeLogger;
+ private InplaceMergeLogger mergeLogger;
private List<Path> unmergedSeries;
private String taskName;
@@ -82,7 +82,7 @@ class MergeMultiChunkTask {
private int concurrentMergeSeriesNum;
private List<Path> currMergingPaths = new ArrayList<>();
- MergeMultiChunkTask(MergeContext context, String taskName, MergeLogger mergeLogger,
+ MergeMultiChunkTask(MergeContext context, String taskName, InplaceMergeLogger mergeLogger,
MergeResource mergeResource, boolean fullMerge, List<Path> unmergedSeries,
int concurrentMergeSeriesNum) {
this.mergeContext = context;
@@ -297,8 +297,7 @@ class MergeMultiChunkTask {
// this only happens when the seqFiles do not contain this series, otherwise the remaining
// data will be merged with the last chunk in the seqFiles
if (isLastFile && currTimeValuePairs[pathIdx] != null) {
- ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx], Long.MAX_VALUE,
- pathIdx);
+ ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx], pathIdx);
mergedChunkNum.incrementAndGet();
}
// the last merged chunk may still be smaller than the threshold, flush it anyway
@@ -385,10 +384,9 @@ class MergeMultiChunkTask {
}
private int writeRemainingUnseq(IChunkWriter chunkWriter,
- IPointReader unseqReader, long timeLimit, int pathIdx) throws IOException {
+ IPointReader unseqReader, int pathIdx) throws IOException {
int ptWritten = 0;
- while (currTimeValuePairs[pathIdx] != null
- && currTimeValuePairs[pathIdx].getTimestamp() < timeLimit) {
+ while (currTimeValuePairs[pathIdx] != null) {
writeTVPair(currTimeValuePairs[pathIdx], chunkWriter);
ptWritten++;
unseqReader.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java
index 2c3a928..e6a79b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java
@@ -29,11 +29,12 @@ import java.util.List;
import java.util.Map.Entry;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.merge.IRecoverMergeTask;
+import org.apache.iotdb.db.engine.merge.MaxSeriesMergeFileSelector;
import org.apache.iotdb.db.engine.merge.MergeCallback;
import org.apache.iotdb.db.engine.merge.inplace.recover.LogAnalyzer;
import org.apache.iotdb.db.engine.merge.inplace.recover.LogAnalyzer.Status;
-import org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger;
-import org.apache.iotdb.db.engine.merge.inplace.selector.MaxSeriesMergeFileSelector;
+import org.apache.iotdb.db.engine.merge.inplace.recover.InplaceMergeLogger;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.utils.MergeUtils;
@@ -47,7 +48,7 @@ import org.slf4j.LoggerFactory;
* RecoverMergeTask is an extension of MergeTask, which resumes the last merge progress by
* scanning merge.log using LogAnalyzer and continue the unfinished merge.
*/
-public class RecoverInplaceMergeTask extends InplaceMergeTask {
+public class RecoverInplaceMergeTask extends InplaceMergeTask implements IRecoverMergeTask {
private static final Logger logger = LoggerFactory.getLogger(RecoverInplaceMergeTask.class);
@@ -61,7 +62,7 @@ public class RecoverInplaceMergeTask extends InplaceMergeTask {
}
public void recoverMerge(boolean continueMerge) throws IOException, MetadataErrorException {
- File logFile = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME);
+ File logFile = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, InplaceMergeLogger.MERGE_LOG_NAME);
if (!logFile.exists()) {
logger.info("{} no merge.log, merge recovery ends", taskName);
return;
@@ -133,7 +134,7 @@ public class RecoverInplaceMergeTask extends InplaceMergeTask {
}
private void resumeMergeProgress() throws IOException {
- mergeLogger = new MergeLogger(storageGroupSysDir);
+ mergeLogger = new InplaceMergeLogger(storageGroupSysDir);
truncateFiles();
recoverChunkCounts();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index 93223f5..8dcdb9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -56,7 +56,7 @@ public class MergeManager implements IService {
return INSTANCE;
}
- public void submitMainTask(InplaceMergeTask mergeTask) {
+ public void submitMainTask(Callable mergeTask) {
mergeTaskPool.submit(mergeTask);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/LogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/LogAnalyzer.java
index 0354513..e0564bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/LogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/LogAnalyzer.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.db.engine.merge.squeeze.recover;
-import static org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger.STR_ALL_TS_END;
-import static org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger.STR_SEQ_FILES;
-import static org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger.STR_UNSEQ_FILES;
+import static org.apache.iotdb.db.engine.merge.squeeze.recover.SqueezeMergeLogger.STR_ALL_TS_END;
+import static org.apache.iotdb.db.engine.merge.squeeze.recover.SqueezeMergeLogger.STR_SEQ_FILES;
+import static org.apache.iotdb.db.engine.merge.squeeze.recover.SqueezeMergeLogger.STR_UNSEQ_FILES;
import java.io.BufferedReader;
import java.io.File;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/MergeLogger.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/SqueezeMergeLogger.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/MergeLogger.java
rename to server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/SqueezeMergeLogger.java
index 1f99557..2b2de1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/MergeLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/SqueezeMergeLogger.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
/**
* MergeLogger records the progress of a merge in file "merge.log" as text lines.
*/
-public class MergeLogger {
+public class SqueezeMergeLogger {
public static final String MERGE_LOG_NAME = "merge.log.squeeze";
@@ -42,7 +42,7 @@ public class MergeLogger {
private BufferedWriter logStream;
- public MergeLogger(String storageGroupDir) throws IOException {
+ public SqueezeMergeLogger(String storageGroupDir) throws IOException {
logStream = new BufferedWriter(new FileWriter(SystemFileFactory.INSTANCE.getFile(storageGroupDir,
MERGE_LOG_NAME), true));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/MaxFileMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/MaxFileMergeFileSelector.java
deleted file mode 100644
index f509fa9..0000000
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/MaxFileMergeFileSelector.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.merge.squeeze.selector;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.merge.IFileQueryMemMeasurement;
-import org.apache.iotdb.db.engine.merge.IMergeFileSelector;
-import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.MergeException;
-import org.apache.iotdb.db.utils.MergeUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * MaxFileMergeFileSelector selects the most files from given seqFiles and unseqFiles which can be
- * merged without exceeding given memory budget. It always assume the number of timeseries being
- * queried at the same time is 1 to maximize the number of file merged.
- */
-public class MaxFileMergeFileSelector implements IMergeFileSelector {
-
- private static final Logger logger = LoggerFactory.getLogger(
- MaxFileMergeFileSelector.class);
- private static final String LOG_FILE_COST = "Memory cost of file {} is {}";
-
- MergeResource resource;
-
- long totalCost;
- private long memoryBudget;
- private long maxSeqFileCost;
-
- // the number of timeseries being queried at the same time
- int concurrentMergeNum = 1;
-
- /**
- * Total metadata size of each file.
- */
- private Map<TsFileResource, Long> fileMetaSizeMap = new HashMap<>();
- /**
- * Maximum memory cost of querying a timeseries in each file.
- */
- private Map<TsFileResource, Long> maxSeriesQueryCostMap = new HashMap<>();
-
- List<TsFileResource> selectedUnseqFiles;
- List<TsFileResource> selectedSeqFiles;
-
- private Collection<Integer> tmpSelectedSeqFiles;
- private long tempMaxSeqFileCost;
-
- private boolean[] seqSelected;
- private int seqSelectedNum;
-
- public MaxFileMergeFileSelector(MergeResource resource, long memoryBudget) {
- this.resource = resource;
- this.memoryBudget = memoryBudget;
- }
-
- /**
- * Select merge candidates from seqFiles and unseqFiles under the given memoryBudget.
- * This process iteratively adds the next unseqFile from unseqFiles and its overlapping seqFiles
- * as newly-added candidates and computes their estimated memory cost. If the current cost
- * pluses the new cost is still under the budget, accept the unseqFile and the seqFiles as
- * candidates, otherwise go to the next iteration.
- * The memory cost of a file is calculated in two ways:
- * The rough estimation: for a seqFile, the size of its metadata is used for estimation.
- * Since in the worst case, the file only contains one timeseries and all its metadata will
- * be loaded into memory with at most one actual data chunk (which is negligible) and writing
- * the timeseries into a new file generate metadata of the similar size, so the size of all
- * seqFiles' metadata (generated when writing new chunks) pluses the largest one (loaded
- * when reading a timeseries from the seqFiles) is the total estimation of all seqFiles; for
- * an unseqFile, since the merge reader may read all chunks of a series to perform a merge
- * read, the whole file may be loaded into memory, so we use the file's length as the
- * maximum estimation.
- * The tight estimation: based on the rough estimation, we scan the file's metadata to
- * count the number of chunks for each series, find the series which have the most
- * chunks in the file and use its chunk proportion to refine the rough estimation.
- * The rough estimation is performed first, if no candidates can be found using rough
- * estimation, we run the selection again with tight estimation.
- * @return two lists of TsFileResource, the former is selected seqFiles and the latter is
- * selected unseqFiles or an empty array if there are no proper candidates by the budget.
- * @throws MergeException
- */
- @Override
- public List[] select() throws MergeException {
- long startTime = System.currentTimeMillis();
- try {
- logger.info("Selecting merge candidates from {} seqFile, {} unseqFiles",
- resource.getSeqFiles().size(), resource.getUnseqFiles().size());
- select(false);
- if (selectedUnseqFiles.isEmpty()) {
- select(true);
- }
- resource.setSeqFiles(selectedSeqFiles);
- resource.setUnseqFiles(selectedUnseqFiles);
- resource.removeOutdatedSeqReaders();
- if (selectedUnseqFiles.isEmpty()) {
- logger.info("No merge candidates are found");
- return new List[0];
- }
- } catch (IOException e) {
- throw new MergeException(e);
- }
- if (logger.isInfoEnabled()) {
- logger.info("Selected merge candidates, {} seqFiles, {} unseqFiles, total memory cost {}, "
- + "time consumption {}ms",
- selectedSeqFiles.size(), selectedUnseqFiles.size(), totalCost,
- System.currentTimeMillis() - startTime);
- }
- return new List[]{selectedSeqFiles, selectedUnseqFiles};
- }
-
- void select(boolean useTightBound) throws IOException {
- tmpSelectedSeqFiles = new HashSet<>();
- seqSelected = new boolean[resource.getSeqFiles().size()];
- seqSelectedNum = 0;
- selectedSeqFiles = new ArrayList<>();
- selectedUnseqFiles = new ArrayList<>();
- maxSeqFileCost = 0;
- tempMaxSeqFileCost = 0;
-
- totalCost = 0;
-
- int unseqIndex = 0;
- long startTime = System.currentTimeMillis();
- long timeConsumption = 0;
- long timeLimit = IoTDBDescriptor.getInstance().getConfig().getMergeFileSelectionTimeBudget();
- if (timeLimit < 0) {
- timeLimit = Long.MAX_VALUE;
- }
- while (unseqIndex < resource.getUnseqFiles().size() && timeConsumption < timeLimit) {
- // select next unseq files
- TsFileResource unseqFile = resource.getUnseqFiles().get(unseqIndex);
-
- selectOverlappedSeqFiles(unseqFile);
-
- tempMaxSeqFileCost = maxSeqFileCost;
- long newCost = useTightBound ? calculateTightMemoryCost(unseqFile, tmpSelectedSeqFiles,
- startTime, timeLimit) :
- calculateLooseMemoryCost(unseqFile, tmpSelectedSeqFiles, startTime, timeLimit);
-
- if (totalCost + newCost < memoryBudget) {
- selectedUnseqFiles.add(unseqFile);
- maxSeqFileCost = tempMaxSeqFileCost;
-
- for (Integer seqIdx : tmpSelectedSeqFiles) {
- seqSelected[seqIdx] = true;
- seqSelectedNum++;
- selectedSeqFiles.add(resource.getSeqFiles().get(seqIdx));
- }
- totalCost += newCost;
- logger.debug("Adding a new unseqFile {} and seqFiles {} as candidates, new cost {}, total"
- + " cost {}",
- unseqFile, tmpSelectedSeqFiles, newCost, totalCost);
- }
- tmpSelectedSeqFiles.clear();
- unseqIndex++;
- timeConsumption = System.currentTimeMillis() - startTime;
- }
- }
-
- private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
- if (seqSelectedNum == resource.getSeqFiles().size()) {
- return;
- }
- int tmpSelectedNum = 0;
- for (Entry<String, Long> deviceStartTimeEntry : unseqFile.getStartTimeMap().entrySet()) {
- String deviceId = deviceStartTimeEntry.getKey();
- Long unseqStartTime = deviceStartTimeEntry.getValue();
- Long unseqEndTime = unseqFile.getEndTimeMap().get(deviceId);
-
- boolean noMoreOverlap = false;
- for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) {
- TsFileResource seqFile = resource.getSeqFiles().get(i);
- if (seqSelected[i] || !seqFile.getEndTimeMap().containsKey(deviceId)) {
- continue;
- }
- Long seqEndTime = seqFile.getEndTimeMap().get(deviceId);
- if (unseqEndTime <= seqEndTime) {
- // the unseqFile overlaps current seqFile
- tmpSelectedSeqFiles.add(i);
- tmpSelectedNum ++;
- // the device of the unseqFile can not merge with later seqFiles
- noMoreOverlap = true;
- } else if (unseqStartTime <= seqEndTime) {
- // the device of the unseqFile may merge with later seqFiles
- // and the unseqFile overlaps current seqFile
- tmpSelectedSeqFiles.add(i);
- tmpSelectedNum++;
- }
- }
- if (tmpSelectedNum + seqSelectedNum == resource.getSeqFiles().size()) {
- break;
- }
- }
- }
-
- private long calculateMemoryCost(TsFileResource tmpSelectedUnseqFile,
- Collection<Integer> tmpSelectedSeqFiles, IFileQueryMemMeasurement unseqMeasurement,
- IFileQueryMemMeasurement seqMeasurement, long startTime, long timeLimit) throws IOException {
- long cost = 0;
- Long fileCost = unseqMeasurement.measure(tmpSelectedUnseqFile);
- cost += fileCost;
-
- for (Integer seqFileIdx : tmpSelectedSeqFiles) {
- TsFileResource seqFile = resource.getSeqFiles().get(seqFileIdx);
- fileCost = seqMeasurement.measure(seqFile);
- if (fileCost > tempMaxSeqFileCost) {
- // only one file will be read at the same time, so only the largest one is recorded here
- cost -= tempMaxSeqFileCost;
- cost += fileCost;
- tempMaxSeqFileCost = fileCost;
- }
- // but writing data into a new file may generate the same amount of metadata in memory
- cost += calculateMetadataSize(seqFile);
- long timeConsumption = System.currentTimeMillis() - startTime;
- if (timeConsumption > timeLimit) {
- return Long.MAX_VALUE;
- }
- }
- return cost;
- }
-
- private long calculateLooseMemoryCost(TsFileResource tmpSelectedUnseqFile,
- Collection<Integer> tmpSelectedSeqFiles, long startTime, long timeLimit) throws IOException {
- return calculateMemoryCost(tmpSelectedUnseqFile, tmpSelectedSeqFiles,
- TsFileResource::getFileSize, this::calculateMetadataSize, startTime, timeLimit);
- }
-
- private long calculateTightMemoryCost(TsFileResource tmpSelectedUnseqFile,
- Collection<Integer> tmpSelectedSeqFiles, long startTime, long timeLimit) throws IOException {
- return calculateMemoryCost(tmpSelectedUnseqFile, tmpSelectedSeqFiles,
- this::calculateTightUnseqMemoryCost, this::calculateTightSeqMemoryCost, startTime, timeLimit);
- }
-
- private long calculateMetadataSize(TsFileResource seqFile) throws IOException {
- Long cost = fileMetaSizeMap.get(seqFile);
- if (cost == null) {
- cost = MergeUtils.getFileMetaSize(seqFile, resource.getFileReader(seqFile));
- fileMetaSizeMap.put(seqFile, cost);
- logger.debug(LOG_FILE_COST, seqFile, cost);
- }
- return cost;
- }
-
- private long calculateTightFileMemoryCost(TsFileResource seqFile, IFileQueryMemMeasurement measurement)
- throws IOException {
- Long cost = maxSeriesQueryCostMap.get(seqFile);
- if (cost == null) {
- long[] chunkNums = MergeUtils.findTotalAndLargestSeriesChunkNum(seqFile, resource.getFileReader(seqFile));
- long totalChunkNum = chunkNums[0];
- long maxChunkNum = chunkNums[1];
- cost = measurement.measure(seqFile) * maxChunkNum / totalChunkNum;
- maxSeriesQueryCostMap.put(seqFile, cost);
- logger.debug(LOG_FILE_COST, seqFile, cost);
- }
- return cost;
- }
-
- // this method traverses all ChunkMetadata to find out which series has the most chunks and uses
- // its proportion to all series to get a maximum estimation
- private long calculateTightSeqMemoryCost(TsFileResource seqFile) throws IOException {
- long singleSeriesCost = calculateTightFileMemoryCost(seqFile, this::calculateMetadataSize);
- long multiSeriesCost = concurrentMergeNum * singleSeriesCost;
- long maxCost = calculateMetadataSize(seqFile);
- return multiSeriesCost > maxCost ? maxCost : multiSeriesCost;
- }
-
- // this method traverses all ChunkMetadata to find out which series has the most chunks and uses
- // its proportion among all series to get a maximum estimation
- private long calculateTightUnseqMemoryCost(TsFileResource unseqFile) throws IOException {
- long singleSeriesCost = calculateTightFileMemoryCost(unseqFile, TsFileResource::getFileSize);
- long multiSeriesCost = concurrentMergeNum * singleSeriesCost;
- long maxCost = unseqFile.getFileSize();
- return multiSeriesCost > maxCost ? maxCost : multiSeriesCost;
- }
-
- @Override
- public int getConcurrentMergeNum() {
- return concurrentMergeNum;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/MergeFileStrategy.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/MergeFileStrategy.java
deleted file mode 100644
index 28fd6ff..0000000
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/MergeFileStrategy.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.merge.squeeze.selector;
-
-public enum MergeFileStrategy {
- MAX_SERIES_NUM,
- MAX_FILE_NUM,
- // TODO: HOW?
- TRADE_OFF,
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/SqueezeMaxFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/SqueezeMaxFileSelector.java
new file mode 100644
index 0000000..d6e3c56
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/SqueezeMaxFileSelector.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.squeeze.selector;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import org.apache.iotdb.db.engine.merge.BaseFileSelector;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MaxFileMergeFileSelector selects the most files from given seqFiles and unseqFiles which can be
+ * merged without exceeding given memory budget. It always assume the number of timeseries being
+ * queried at the same time is 1 to maximize the number of file merged.
+ */
+public class SqueezeMaxFileSelector extends BaseFileSelector {
+
+ private static final Logger logger = LoggerFactory.getLogger(
+ SqueezeMaxFileSelector.class);
+
+ // the file selection of squeeze strategy is different from that of inplace strategy, consider:
+ // seqFile1 has: (device1, [1,100]) (device2, [1,100])
+ // seqFile2 has: (device1, [101,200]) (device2, [101,200])
+ // seqFile3 has: (device1, [201,300]) (device2, [201,300])
+ // unseqFile1 has: (device1, [1,100]) (device2, [201,300])
+ // When using inplace strategy, unseqFile1 will merge with seqFile1 and seqFile3 and generates
+ // 2 files which still don't overlap seqFile2.
+ // However, when using squeeze strategy, unseqFile1 must also merge with seqFile2, otherwise,
+ // the generated file will overlap seqFiles2.
+ // As a result, we must find the file that firstly overlaps the unseqFile and the file that
+ // lastly overlaps the unseqFile and merge all files in between.
+ private int firstOverlapIdx = Integer.MAX_VALUE;
+ private int lastOverlapIdx = Integer.MIN_VALUE;
+
+ private int tmpFirstOverlapIdx = Integer.MAX_VALUE;
+ private int tmpLastOverlapIdx = Integer.MIN_VALUE;
+
+ public SqueezeMaxFileSelector(MergeResource resource, long memoryBudget) {
+ this.resource = resource;
+ this.memoryBudget = memoryBudget;
+ this.tmpSelectedSeqIterable = new BoarderTmpSeqIter();
+ }
+
+ public void select(boolean useTightBound) throws IOException {
+ super.select(useTightBound);
+ for (int i = firstOverlapIdx; i < lastOverlapIdx; i++) {
+ selectedSeqFiles.add(resource.getSeqFiles().get(i));
+ }
+ }
+
+ protected void updateCost(long newCost, TsFileResource unseqFile) {
+ if (totalCost + newCost < memoryBudget) {
+ selectedUnseqFiles.add(unseqFile);
+ maxSeqFileCost = tempMaxSeqFileCost;
+
+ firstOverlapIdx = tmpFirstOverlapIdx;
+ lastOverlapIdx = tmpLastOverlapIdx;
+
+ int newSeqNum = lastOverlapIdx - firstOverlapIdx + 1;
+ int deltaSeqNum = newSeqNum - seqSelectedNum;
+ seqSelectedNum = newSeqNum;
+
+ totalCost += newCost;
+ logger.debug("Adding a new unseqFile {} and {} seqFiles as candidates, new cost {}, total"
+ + " cost {}", unseqFile, deltaSeqNum ,newCost, totalCost);
+ }
+ }
+
+ protected void selectOverlappedSeqFiles(TsFileResource unseqFile) {
+ if (seqSelectedNum == resource.getSeqFiles().size()) {
+ return;
+ }
+
+ for (Entry<String, Long> deviceStartTimeEntry : unseqFile.getStartTimeMap().entrySet()) {
+ String deviceId = deviceStartTimeEntry.getKey();
+ Long unseqStartTime = deviceStartTimeEntry.getValue();
+ Long unseqEndTime = unseqFile.getEndTimeMap().get(deviceId);
+
+ boolean noMoreOverlap = false;
+ for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) {
+ TsFileResource seqFile = resource.getSeqFiles().get(i);
+ if (!seqFile.getEndTimeMap().containsKey(deviceId)) {
+ continue;
+ }
+ Long seqEndTime = seqFile.getEndTimeMap().get(deviceId);
+ if (unseqEndTime <= seqEndTime) {
+ // the unseqFile overlaps current seqFile
+ tmpFirstOverlapIdx = Math.min(firstOverlapIdx, i);
+ tmpLastOverlapIdx = Math.max(lastOverlapIdx, i);
+ // the device of the unseqFile can not merge with later seqFiles
+ noMoreOverlap = true;
+ } else if (unseqStartTime <= seqEndTime) {
+ // the device of the unseqFile may merge with later seqFiles
+ // and the unseqFile overlaps current seqFile
+ tmpFirstOverlapIdx = Math.min(firstOverlapIdx, i);
+ tmpLastOverlapIdx = Math.max(lastOverlapIdx, i);
+ }
+ }
+ }
+ }
+
+ protected class BoarderTmpSeqIter extends TmpSelectedSeqIterable {
+
+ @Override
+ public Iterator<Integer> iterator() {
+ return new Iterator<Integer>() {
+ int _next = firstOverlapIdx;
+ @Override
+ public boolean hasNext() {
+ return _next <= lastOverlapIdx;
+ }
+
+ @Override
+ public Integer next() {
+ return _next ++;
+ }
+ };
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/MergeSeriesTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/MergeSeriesTask.java
index ed14c6c..31865d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/MergeSeriesTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/MergeSeriesTask.java
@@ -36,11 +36,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.merge.IMergePathSelector;
+import org.apache.iotdb.db.engine.merge.NaivePathSelector;
import org.apache.iotdb.db.engine.merge.manage.MergeContext;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger;
-import org.apache.iotdb.db.engine.merge.squeeze.selector.NaivePathSelector;
+import org.apache.iotdb.db.engine.merge.squeeze.recover.SqueezeMergeLogger;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.reader.IPointReader;
@@ -69,7 +69,7 @@ class MergeSeriesTask {
private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig()
.getChunkMergePointThreshold();
- private MergeLogger mergeLogger;
+ private SqueezeMergeLogger mergeLogger;
private List<Path> unmergedSeries;
private String taskName;
@@ -88,7 +88,7 @@ class MergeSeriesTask {
private TsFileResource newResource;
private String currDevice = null;
- MergeSeriesTask(MergeContext context, String taskName, MergeLogger mergeLogger,
+ MergeSeriesTask(MergeContext context, String taskName, SqueezeMergeLogger mergeLogger,
MergeResource mergeResource, List<Path> unmergedSeries,
int concurrentMergeSeriesNum) {
this.mergeContext = context;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/RecoverSqueezeMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/RecoverSqueezeMergeTask.java
index c32ffb6..1a26330 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/RecoverSqueezeMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/RecoverSqueezeMergeTask.java
@@ -24,11 +24,12 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.merge.IRecoverMergeTask;
import org.apache.iotdb.db.engine.merge.MergeCallback;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.squeeze.recover.LogAnalyzer;
import org.apache.iotdb.db.engine.merge.squeeze.recover.LogAnalyzer.Status;
-import org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger;
+import org.apache.iotdb.db.engine.merge.squeeze.recover.SqueezeMergeLogger;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
@@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory;
* RecoverMergeTask is an extension of MergeTask, which resumes the last merge progress by
* scanning merge.log using LogAnalyzer and continue the unfinished merge.
*/
-public class RecoverSqueezeMergeTask extends SqueezeMergeTask {
+public class RecoverSqueezeMergeTask extends SqueezeMergeTask implements IRecoverMergeTask {
private static final Logger logger = LoggerFactory.getLogger(RecoverSqueezeMergeTask.class);
@@ -49,8 +50,9 @@ public class RecoverSqueezeMergeTask extends SqueezeMergeTask {
1, storageGroupName);
}
- public void recoverMerge() throws IOException {
- File logFile = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME);
+ // continueMerge does not work for squeeze strategy
+ public void recoverMerge(boolean continueMerge) throws IOException {
+ File logFile = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, SqueezeMergeLogger.MERGE_LOG_NAME);
if (!logFile.exists()) {
logger.info("{} no merge.log, merge recovery ends", taskName);
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/SqueezeMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/SqueezeMergeTask.java
index 96c4861..0bcde8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/SqueezeMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/SqueezeMergeTask.java
@@ -10,7 +10,7 @@ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.merge.manage.MergeContext;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.MergeCallback;
-import org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger;
+import org.apache.iotdb.db.engine.merge.squeeze.recover.SqueezeMergeLogger;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.metadata.MManager;
@@ -29,7 +29,7 @@ public class SqueezeMergeTask implements Callable<Void> {
MergeResource resource;
String storageGroupSysDir;
String storageGroupName;
- private MergeLogger mergeLogger;
+ private SqueezeMergeLogger mergeLogger;
private MergeContext mergeContext = new MergeContext();
MergeCallback callback;
@@ -59,7 +59,7 @@ public class SqueezeMergeTask implements Callable<Void> {
// empty file lists to avoid files being deleted.
callback.call(
Collections.emptyList(), Collections.emptyList(), SystemFileFactory.INSTANCE.getFile(storageGroupSysDir,
- MergeLogger.MERGE_LOG_NAME), null);
+ SqueezeMergeLogger.MERGE_LOG_NAME), null);
throw e;
}
return null;
@@ -73,7 +73,7 @@ public class SqueezeMergeTask implements Callable<Void> {
long startTime = System.currentTimeMillis();
long totalFileSize = MergeUtils.collectFileSizes(resource.getSeqFiles(),
resource.getUnseqFiles());
- mergeLogger = new MergeLogger(storageGroupSysDir);
+ mergeLogger = new SqueezeMergeLogger(storageGroupSysDir);
mergeLogger.logFiles(resource);
@@ -116,7 +116,7 @@ public class SqueezeMergeTask implements Callable<Void> {
}
File logFile = FSFactoryProducer.getFSFactory().getFile(storageGroupSysDir,
- MergeLogger.MERGE_LOG_NAME);
+ SqueezeMergeLogger.MERGE_LOG_NAME);
if (executeCallback) {
// make sure merge.log is not deleted until unseqFiles are cleared so that when system
// reboots, the undeleted files can be deleted again
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 3c4f982..98a83e4 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -35,19 +35,23 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.merge.IRecoverMergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.recover.InplaceMergeLogger;
+import org.apache.iotdb.db.engine.merge.MergeFileStrategy;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.RecoverInplaceMergeTask;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.IMergeFileSelector;
-import org.apache.iotdb.db.engine.merge.inplace.selector.MaxFileMergeFileSelector;
-import org.apache.iotdb.db.engine.merge.inplace.selector.MaxSeriesMergeFileSelector;
-import org.apache.iotdb.db.engine.merge.inplace.selector.MergeFileStrategy;
-import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
-import org.apache.iotdb.db.engine.merge.inplace.task.RecoverInplaceMergeTask;
+import org.apache.iotdb.db.engine.merge.squeeze.recover.SqueezeMergeLogger;
+import org.apache.iotdb.db.engine.merge.squeeze.task.RecoverSqueezeMergeTask;
+import org.apache.iotdb.db.engine.merge.squeeze.task.SqueezeMergeTask;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -207,7 +211,7 @@ public class StorageGroupProcessor {
}
private void recover() throws ProcessorException {
- logger.info("recover Storage Group {}", storageGroupName);
+ logger.info("recover Storage Group {}", storageGroupName);
try {
// collect TsFiles from sequential and unsequential data directory
@@ -223,11 +227,22 @@ public class StorageGroupProcessor {
if (mergingMods.exists()) {
mergingModification = new ModificationFile(mergingMods.getPath());
}
- RecoverInplaceMergeTask recoverMergeTask = new RecoverInplaceMergeTask(seqTsFiles, unseqTsFiles,
- storageGroupSysDir.getPath(), this::mergeEndAction, taskName,
- IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
- logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
- recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
+
+ Class cls = judgeMergeStrategy();
+ if (cls != null) {
+ IRecoverMergeTask recoverMergeTask;
+ if (cls == InplaceMergeTask.class) {
+ recoverMergeTask = new RecoverInplaceMergeTask(seqTsFiles, unseqTsFiles,
+ storageGroupSysDir.getPath(), this::mergeEndAction, taskName,
+ IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
+ } else {
+ recoverMergeTask = new RecoverSqueezeMergeTask(seqTsFiles, unseqTsFiles,
+ storageGroupSysDir.getPath(), this::mergeEndAction, taskName, storageGroupName);
+ }
+ logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
+ recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
+ }
+
if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
mergingMods.delete();
}
@@ -241,6 +256,20 @@ public class StorageGroupProcessor {
}
}
+ private Class judgeMergeStrategy() {
+ File inPlaceLogFile = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir,
+ InplaceMergeLogger.MERGE_LOG_NAME);
+ if (inPlaceLogFile.exists()) {
+ return InplaceMergeTask.class;
+ }
+ File squeezeLogFile = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir,
+ SqueezeMergeLogger.MERGE_LOG_NAME);
+ if (squeezeLogFile.exists()) {
+ return SqueezeMergeTask.class;
+ }
+ return null;
+ }
+
private List<TsFileResource> getAllFiles(List<String> folders) {
List<File> tsFiles = new ArrayList<>();
for (String baseDir : folders) {
@@ -863,10 +892,14 @@ public class StorageGroupProcessor {
long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList);
- IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource);
+
+ // TODO: choose a better strategy accordingly
+ MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy();
+
+ IMergeFileSelector fileSelector = strategy.getFileSelector(mergeResource, budget);
try {
- List[] mergeFiles = fileSelector.select();
- if (mergeFiles.length == 0) {
+ fileSelector.select();
+ if (fileSelector.getSelectedSeqFiles().isEmpty() && fileSelector.getSelectedUnseqFiles().isEmpty()) {
logger.info("{} cannot select merge candidates under the budget {}", storageGroupName,
budget);
return;
@@ -878,13 +911,15 @@ public class StorageGroupProcessor {
// cached during selection
mergeResource.setCacheDeviceMeta(true);
- InplaceMergeTask mergeTask = new InplaceMergeTask(mergeResource, storageGroupSysDir.getPath(),
- this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(), storageGroupName);
+ Callable<Void> mergeTask = strategy.getMergeTask(mergeResource, storageGroupSysDir.getPath(),
+ this::mergeEndAction, taskName, fileSelector.getConcurrentMergeNum(),
+ storageGroupName , fullMerge);
mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME);
MergeManager.getINSTANCE().submitMainTask(mergeTask);
if (logger.isInfoEnabled()) {
logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
- storageGroupName, taskName, mergeFiles[0].size(), mergeFiles[1].size());
+ storageGroupName, taskName, mergeResource.getSeqFiles().size(),
+ mergeResource.getUnseqFiles().size());
}
isMerging = true;
mergeStartTime = System.currentTimeMillis();
@@ -897,18 +932,6 @@ public class StorageGroupProcessor {
}
}
- private IMergeFileSelector getMergeFileSelector(long budget, MergeResource resource) {
- MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy();
- switch (strategy) {
- case MAX_FILE_NUM:
- return new MaxFileMergeFileSelector(resource, budget);
- case MAX_SERIES_NUM:
- return new MaxSeriesMergeFileSelector(resource, budget);
- default:
- throw new UnsupportedOperationException("Unknown MergeFileStrategy " + strategy);
- }
- }
-
private void handleInplaceMerge(List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles, File mergeLog) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MaxFileMergeFileSelectorTest.java
similarity index 58%
rename from server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
rename to server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MaxFileMergeFileSelectorTest.java
index a380c53..b51103e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MaxFileMergeFileSelectorTest.java
@@ -17,45 +17,48 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge;
+package org.apache.iotdb.db.engine.merge.inplace;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
+import org.apache.iotdb.db.engine.merge.IMergeFileSelector;
+import org.apache.iotdb.db.engine.merge.MergeTest;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.inplace.selector.MaxFileMergeFileSelector;
+import org.apache.iotdb.db.engine.merge.inplace.selector.InplaceMaxFileSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
import org.junit.Test;
-public class MaxFileMergeFileSelectorTest extends MergeTest{
+public class MaxFileMergeFileSelectorTest extends MergeTest {
@Test
public void testFullSelection() throws MergeException, IOException {
MergeResource resource = new MergeResource(seqResources, unseqResources);
- IMergeFileSelector mergeFileSelector = new MaxFileMergeFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- List<TsFileResource> seqSelected = result[0];
- List<TsFileResource> unseqSelected = result[1];
+ IMergeFileSelector mergeFileSelector = new InplaceMaxFileSelector(resource, Long.MAX_VALUE);
+ mergeFileSelector.select();
+ List<TsFileResource> seqSelected = mergeFileSelector.getSelectedSeqFiles();
+ List<TsFileResource> unseqSelected = mergeFileSelector.getSelectedUnseqFiles();
assertEquals(seqResources, seqSelected);
assertEquals(unseqResources, unseqSelected);
resource.clear();
resource = new MergeResource(seqResources.subList(0, 1), unseqResources);
- mergeFileSelector = new MaxFileMergeFileSelector(resource, Long.MAX_VALUE);
- result = mergeFileSelector.select();
- seqSelected = result[0];
- unseqSelected = result[1];
+ mergeFileSelector = new InplaceMaxFileSelector(resource, Long.MAX_VALUE);
+ mergeFileSelector.select();
+ seqSelected = mergeFileSelector.getSelectedSeqFiles();
+ unseqSelected = mergeFileSelector.getSelectedUnseqFiles();
assertEquals(seqResources.subList(0, 1), seqSelected);
assertEquals(unseqResources, unseqSelected);
resource.clear();
resource = new MergeResource(seqResources, unseqResources.subList(0, 1));
- mergeFileSelector = new MaxFileMergeFileSelector(resource, Long.MAX_VALUE);
- result = mergeFileSelector.select();
- seqSelected = result[0];
- unseqSelected = result[1];
+ mergeFileSelector = new InplaceMaxFileSelector(resource, Long.MAX_VALUE);
+ mergeFileSelector.select();
+ seqSelected = mergeFileSelector.getSelectedSeqFiles();
+ unseqSelected = mergeFileSelector.getSelectedUnseqFiles();
assertEquals(seqResources.subList(0, 1), seqSelected);
assertEquals(unseqResources.subList(0, 1), unseqSelected);
resource.clear();
@@ -64,19 +67,20 @@ public class MaxFileMergeFileSelectorTest extends MergeTest{
@Test
public void testNonSelection() throws MergeException, IOException {
MergeResource resource = new MergeResource(seqResources, unseqResources);
- IMergeFileSelector mergeFileSelector = new MaxFileMergeFileSelector(resource, 1);
- List[] result = mergeFileSelector.select();
- assertEquals(0, result.length);
+ IMergeFileSelector mergeFileSelector = new InplaceMaxFileSelector(resource, 1);
+ mergeFileSelector.select();
+ assertTrue(mergeFileSelector.getSelectedSeqFiles().isEmpty());
+ assertTrue(mergeFileSelector.getSelectedUnseqFiles().isEmpty());
resource.clear();
}
@Test
public void testRestrictedSelection() throws MergeException, IOException {
MergeResource resource = new MergeResource(seqResources, unseqResources);
- IMergeFileSelector mergeFileSelector = new MaxFileMergeFileSelector(resource, 400000);
- List[] result = mergeFileSelector.select();
- List<TsFileResource> seqSelected = result[0];
- List<TsFileResource> unseqSelected = result[1];
+ IMergeFileSelector mergeFileSelector = new InplaceMaxFileSelector(resource, 400000);
+ mergeFileSelector.select();
+ List<TsFileResource> seqSelected = mergeFileSelector.getSelectedSeqFiles();
+ List<TsFileResource> unseqSelected = mergeFileSelector.getSelectedUnseqFiles();
assertEquals(seqResources.subList(0, 2), seqSelected);
assertEquals(unseqResources.subList(0, 2), unseqSelected);
resource.clear();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxSeriesMergeFileSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MaxSeriesMergeFileSelectorTest.java
similarity index 59%
rename from server/src/test/java/org/apache/iotdb/db/engine/merge/MaxSeriesMergeFileSelectorTest.java
rename to server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MaxSeriesMergeFileSelectorTest.java
index 3a55c38..01ada2a 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxSeriesMergeFileSelectorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MaxSeriesMergeFileSelectorTest.java
@@ -17,47 +17,53 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge;
+package org.apache.iotdb.db.engine.merge.inplace;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
+import org.apache.iotdb.db.engine.merge.MaxSeriesMergeFileSelector;
+import org.apache.iotdb.db.engine.merge.MergeTest;
+import org.apache.iotdb.db.engine.merge.inplace.selector.InplaceMaxFileSelector;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.inplace.selector.MaxSeriesMergeFileSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
import org.junit.Test;
-public class MaxSeriesMergeFileSelectorTest extends MergeTest{
+public class MaxSeriesMergeFileSelectorTest extends MergeTest {
@Test
public void testFullSelection() throws MergeException, IOException {
MergeResource resource = new MergeResource(seqResources, unseqResources);
- MaxSeriesMergeFileSelector mergeFileSelector = new MaxSeriesMergeFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- List<TsFileResource> seqSelected = result[0];
- List<TsFileResource> unseqSelected = result[1];
+ MaxSeriesMergeFileSelector mergeFileSelector =
+ new MaxSeriesMergeFileSelector(new InplaceMaxFileSelector(resource, Long.MAX_VALUE));
+ mergeFileSelector.select();
+ List<TsFileResource> seqSelected = mergeFileSelector.getSelectedSeqFiles();
+ List<TsFileResource> unseqSelected = mergeFileSelector.getSelectedUnseqFiles();
assertEquals(seqResources, seqSelected);
assertEquals(unseqResources, unseqSelected);
assertEquals(MaxSeriesMergeFileSelector.MAX_SERIES_NUM, mergeFileSelector.getConcurrentMergeNum());
resource.clear();
resource = new MergeResource(seqResources.subList(0, 1), unseqResources);
- mergeFileSelector = new MaxSeriesMergeFileSelector(resource, Long.MAX_VALUE);
- result = mergeFileSelector.select();
- seqSelected = result[0];
- unseqSelected = result[1];
+ mergeFileSelector = new MaxSeriesMergeFileSelector(new InplaceMaxFileSelector(resource,
+ Long.MAX_VALUE));
+ mergeFileSelector.select();
+ seqSelected = mergeFileSelector.getSelectedSeqFiles();
+ unseqSelected = mergeFileSelector.getSelectedUnseqFiles();
assertEquals(seqResources.subList(0, 1), seqSelected);
assertEquals(unseqResources, unseqSelected);
assertEquals(MaxSeriesMergeFileSelector.MAX_SERIES_NUM, mergeFileSelector.getConcurrentMergeNum());
resource.clear();
resource = new MergeResource(seqResources, unseqResources.subList(0, 1));
- mergeFileSelector = new MaxSeriesMergeFileSelector(resource, Long.MAX_VALUE);
- result = mergeFileSelector.select();
- seqSelected = result[0];
- unseqSelected = result[1];
+ mergeFileSelector = new MaxSeriesMergeFileSelector(new InplaceMaxFileSelector(resource,
+ Long.MAX_VALUE));
+ mergeFileSelector.select();
+ seqSelected = mergeFileSelector.getSelectedSeqFiles();
+ unseqSelected = mergeFileSelector.getSelectedUnseqFiles();
assertEquals(seqResources.subList(0, 1), seqSelected);
assertEquals(unseqResources.subList(0, 1), unseqSelected);
assertEquals(MaxSeriesMergeFileSelector.MAX_SERIES_NUM, mergeFileSelector.getConcurrentMergeNum());
@@ -67,20 +73,22 @@ public class MaxSeriesMergeFileSelectorTest extends MergeTest{
@Test
public void testNonSelection() throws MergeException, IOException {
MergeResource resource = new MergeResource(seqResources, unseqResources);
- MaxSeriesMergeFileSelector mergeFileSelector = new MaxSeriesMergeFileSelector(resource, 1);
- List[] result = mergeFileSelector.select();
- assertEquals(0, result.length);
- assertEquals(0, mergeFileSelector.getConcurrentMergeNum());
+ MaxSeriesMergeFileSelector mergeFileSelector =
+ new MaxSeriesMergeFileSelector(new InplaceMaxFileSelector(resource, 1));
+ mergeFileSelector.select();
+ assertTrue(mergeFileSelector.getSelectedSeqFiles().isEmpty());
+ assertTrue(mergeFileSelector.getSelectedUnseqFiles().isEmpty());
resource.clear();
}
@Test
public void testRestrictedSelection() throws MergeException, IOException {
MergeResource resource = new MergeResource(seqResources, unseqResources);
- MaxSeriesMergeFileSelector mergeFileSelector = new MaxSeriesMergeFileSelector(resource, 400000);
- List[] result = mergeFileSelector.select();
- List<TsFileResource> seqSelected = result[0];
- List<TsFileResource> unseqSelected = result[1];
+ MaxSeriesMergeFileSelector mergeFileSelector =
+ new MaxSeriesMergeFileSelector(new InplaceMaxFileSelector(resource, 400000));
+ mergeFileSelector.select();
+ List<TsFileResource> seqSelected = mergeFileSelector.getSelectedSeqFiles();
+ List<TsFileResource> unseqSelected = mergeFileSelector.getSelectedUnseqFiles();
assertEquals(seqResources.subList(0, 2), seqSelected);
assertEquals(unseqResources.subList(0, 2), unseqSelected);
assertEquals(MaxSeriesMergeFileSelector.MAX_SERIES_NUM, mergeFileSelector.getConcurrentMergeNum());
@@ -90,11 +98,11 @@ public class MaxSeriesMergeFileSelectorTest extends MergeTest{
@Test
public void testRestrictedSelection2() throws MergeException, IOException {
MergeResource resource = new MergeResource(seqResources, unseqResources);
- MaxSeriesMergeFileSelector mergeFileSelector = new MaxSeriesMergeFileSelector(resource,
- 100000);
- List[] result = mergeFileSelector.select();
- List<TsFileResource> seqSelected = result[0];
- List<TsFileResource> unseqSelected = result[1];
+ MaxSeriesMergeFileSelector mergeFileSelector = new MaxSeriesMergeFileSelector(
+ new InplaceMaxFileSelector(resource, 100000));
+ mergeFileSelector.select();
+ List<TsFileResource> seqSelected = mergeFileSelector.getSelectedSeqFiles();
+ List<TsFileResource> unseqSelected = mergeFileSelector.getSelectedUnseqFiles();
assertEquals(seqResources.subList(0, 1), seqSelected);
assertEquals(unseqResources.subList(0, 1), unseqSelected);
assertEquals(34, mergeFileSelector.getConcurrentMergeNum());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergeLogTest.java
similarity index 96%
rename from server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
rename to server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergeLogTest.java
index bfb1218..a0a73f6 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergeLogTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge;
+package org.apache.iotdb.db.engine.merge.inplace;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -28,6 +28,7 @@ import java.io.FileReader;
import java.io.IOException;
import java.util.List;
import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.merge.MergeTest;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergePerfTest.java
similarity index 91%
rename from server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
rename to server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergePerfTest.java
index eae1982..1508415 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergePerfTest.java
@@ -17,35 +17,34 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge;
+package org.apache.iotdb.db.engine.merge.inplace;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.merge.MergeTest;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-public class MergePerfTest extends MergeTest{
+public class MergePerfTest extends MergeTest {
- private long timeConsumption;
private boolean fullMerge;
- private File tempSGDir;
public void test() throws Exception {
MManager.getInstance().init();
- tempSGDir = new File("tempSG");
+ File tempSGDir = new File("tempSG");
tempSGDir.mkdirs();
setUp();
- timeConsumption = System.currentTimeMillis();
+ long timeConsumption = System.currentTimeMillis();
MergeResource resource = new MergeResource(seqResources, unseqResources);
resource.setCacheDeviceMeta(true);
InplaceMergeTask mergeTask =
new InplaceMergeTask(resource, tempSGDir.getPath(), (k, v
- , l) -> {}, "test", fullMerge, 100, MERGE_TEST_SG);
+ , l, n) -> {}, "test", fullMerge, 100, MERGE_TEST_SG);
mergeTask.call();
timeConsumption = System.currentTimeMillis() - timeConsumption;
tearDown();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergeTaskTest.java
similarity index 98%
copy from server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
copy to server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergeTaskTest.java
index 0b2b951..9801160 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergeTaskTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge;
+package org.apache.iotdb.db.engine.merge.inplace;
import static org.junit.Assert.assertEquals;
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.Collections;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.merge.MergeTest;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
import org.apache.iotdb.db.engine.modification.Deletion;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/squeeze/MergeTaskTest.java
similarity index 98%
rename from server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
rename to server/src/test/java/org/apache/iotdb/db/engine/merge/squeeze/MergeTaskTest.java
index 0b2b951..db2a361 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/squeeze/MergeTaskTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge;
+package org.apache.iotdb.db.engine.merge.squeeze;
import static org.junit.Assert.assertEquals;
@@ -26,8 +26,9 @@ import java.io.IOException;
import java.util.Collections;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.MergeTest;
import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 6a5362f..4bae164 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -223,8 +223,8 @@ public class StorageGroupProcessorTest {
@Override
protected void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
- File mergeLog) {
- super.mergeEndAction(seqFiles, unseqFiles, mergeLog);
+ File mergeLog, TsFileResource newFile) {
+ super.mergeEndAction(seqFiles, unseqFiles, mergeLog, newFile);
mergeLock.incrementAndGet();
assertFalse(mergeLog.exists());
}