You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/10/25 03:16:54 UTC
[incubator-iotdb] branch dev_new_merge_strategy updated: add
revocery in squeeze strategy
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_new_merge_strategy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_new_merge_strategy by this push:
new 390bd3f add revocery in squeeze strategy
390bd3f is described below
commit 390bd3f743f3740ba1fed8904ca11b16d329fc80
Author: jt <jt...@163.com>
AuthorDate: Fri Oct 25 11:16:31 2019 +0800
add revocery in squeeze strategy
---
.../engine/merge/inplace/recover/LogAnalyzer.java | 6 +-
.../engine/merge/inplace/recover/MergeLogger.java | 2 +-
.../task/{MergeTask.java => InplaceMergeTask.java} | 10 +-
...MergeTask.java => RecoverInplaceMergeTask.java} | 17 +-
.../iotdb/db/engine/merge/manage/MergeManager.java | 4 +-
.../db/engine/merge/manage/MergeResource.java | 2 +-
.../engine/merge/squeeze/recover/LogAnalyzer.java | 185 +++------------------
.../engine/merge/squeeze/recover/MergeLogger.java | 52 +-----
.../engine/merge/squeeze/task/MergeSeriesTask.java | 14 +-
.../squeeze/task/RecoverSqueezeMergeTask.java | 100 +++++++++++
.../merge/squeeze/task/SqueezeMergeTask.java | 30 ++--
.../engine/storagegroup/StorageGroupProcessor.java | 29 +++-
.../apache/iotdb/db/engine/merge/MergeLogTest.java | 6 +-
.../iotdb/db/engine/merge/MergePerfTest.java | 6 +-
.../iotdb/db/engine/merge/MergeTaskTest.java | 42 ++---
15 files changed, 211 insertions(+), 294 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java
index 9b1f957..73ba34e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java
@@ -30,7 +30,7 @@ import java.util.Map.Entry;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.metadata.MManager;
@@ -176,7 +176,7 @@ public class LogAnalyzer {
status = Status.MERGE_START;
for (TsFileResource seqFile : resource.getSeqFiles()) {
- File mergeFile = SystemFileFactory.INSTANCE.getFile(seqFile.getFile().getPath() + MergeTask.MERGE_SUFFIX);
+ File mergeFile = SystemFileFactory.INSTANCE.getFile(seqFile.getFile().getPath() + InplaceMergeTask.MERGE_SUFFIX);
fileLastPositions.put(mergeFile, 0L);
}
@@ -238,7 +238,7 @@ public class LogAnalyzer {
fileLastPositions.put(currFile, lastPost);
} else {
fileLastPositions.remove(currFile);
- String seqFilePath = currFile.getAbsolutePath().replace(MergeTask.MERGE_SUFFIX, "");
+ String seqFilePath = currFile.getAbsolutePath().replace(InplaceMergeTask.MERGE_SUFFIX, "");
Iterator<TsFileResource> unmergedFileIter = unmergedFiles.iterator();
while (unmergedFileIter.hasNext()) {
TsFileResource seqFile = unmergedFileIter.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/MergeLogger.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/MergeLogger.java
index 24f6244..d4a6b51 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/MergeLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/MergeLogger.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
*/
public class MergeLogger {
- public static final String MERGE_LOG_NAME = "merge.log";
+ public static final String MERGE_LOG_NAME = "merge.log.inplace";
static final String STR_SEQ_FILES = "seqFiles";
static final String STR_UNSEQ_FILES = "unseqFiles";
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/InplaceMergeTask.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeTask.java
rename to server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/InplaceMergeTask.java
index c993cd6..11ad188 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/InplaceMergeTask.java
@@ -47,10 +47,10 @@ import org.slf4j.LoggerFactory;
* chunks in the seqFiles into temp files and replace the seqFiles with the temp files.
* 3. remove unseqFiles
*/
-public class MergeTask implements Callable<Void> {
+public class InplaceMergeTask implements Callable<Void> {
- public static final String MERGE_SUFFIX = ".merge";
- private static final Logger logger = LoggerFactory.getLogger(MergeTask.class);
+ public static final String MERGE_SUFFIX = ".merge.inplace";
+ private static final Logger logger = LoggerFactory.getLogger(InplaceMergeTask.class);
MergeResource resource;
String storageGroupSysDir;
@@ -63,7 +63,7 @@ public class MergeTask implements Callable<Void> {
String taskName;
boolean fullMerge;
- MergeTask(List<TsFileResource> seqFiles,
+ InplaceMergeTask(List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles, String storageGroupSysDir, MergeCallback callback,
String taskName, boolean fullMerge, String storageGroupName) {
this.resource = new MergeResource(seqFiles, unseqFiles);
@@ -75,7 +75,7 @@ public class MergeTask implements Callable<Void> {
this.storageGroupName = storageGroupName;
}
- public MergeTask(MergeResource mergeResource, String storageGroupSysDir, MergeCallback callback,
+ public InplaceMergeTask(MergeResource mergeResource, String storageGroupSysDir, MergeCallback callback,
String taskName, boolean fullMerge, int concurrentMergeSeriesNum, String storageGroupName) {
this.resource = mergeResource;
this.storageGroupSysDir = storageGroupSysDir;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverMergeTask.java
rename to server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java
index 93d0ae2..2c3a928 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java
@@ -47,13 +47,13 @@ import org.slf4j.LoggerFactory;
* RecoverMergeTask is an extension of MergeTask, which resumes the last merge progress by
* scanning merge.log using LogAnalyzer and continue the unfinished merge.
*/
-public class RecoverMergeTask extends MergeTask {
+public class RecoverInplaceMergeTask extends InplaceMergeTask {
- private static final Logger logger = LoggerFactory.getLogger(RecoverMergeTask.class);
+ private static final Logger logger = LoggerFactory.getLogger(RecoverInplaceMergeTask.class);
private LogAnalyzer analyzer;
- public RecoverMergeTask(List<TsFileResource> seqFiles,
+ public RecoverInplaceMergeTask(List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles, String storageGroupSysDir,
MergeCallback callback, String taskName,
boolean fullMerge, String storageGroupName) {
@@ -160,9 +160,8 @@ public class RecoverMergeTask extends MergeTask {
long maxChunkNum = chunkNums[1];
long fileMetaSize = MergeUtils.getFileMetaSize(seqFile, resource.getFileReader(seqFile));
long newSingleSeriesSeqReadCost = fileMetaSize * maxChunkNum / totalChunkNum;
- singleSeriesSeqReadCost = newSingleSeriesSeqReadCost > singleSeriesSeqReadCost ?
- newSingleSeriesSeqReadCost : singleSeriesSeqReadCost;
- maxSeqReadCost = fileMetaSize > maxSeqReadCost ? fileMetaSize : maxSeqReadCost;
+ singleSeriesSeqReadCost = Math.max(newSingleSeriesSeqReadCost, singleSeriesSeqReadCost);
+ maxSeqReadCost = Math.max(fileMetaSize, maxSeqReadCost);
seqWriteCost += fileMetaSize;
}
@@ -171,10 +170,8 @@ public class RecoverMergeTask extends MergeTask {
int ub = MaxSeriesMergeFileSelector.MAX_SERIES_NUM;
int mid = (lb + ub) / 2;
while (mid != lb) {
- long unseqCost = singleSeriesUnseqCost * mid < maxUnseqCost ? singleSeriesUnseqCost * mid :
- maxUnseqCost;
- long seqReadCos = singleSeriesSeqReadCost * mid < maxSeqReadCost ?
- singleSeriesSeqReadCost * mid : maxSeqReadCost;
+ long unseqCost = Math.min(singleSeriesUnseqCost * mid, maxUnseqCost);
+ long seqReadCos = Math.min(singleSeriesSeqReadCost * mid, maxSeqReadCost);
long totalCost = unseqCost + seqReadCos + seqWriteCost;
if (totalCost <= memBudget) {
lb = mid;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index a6d8432..93223f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
@@ -56,7 +56,7 @@ public class MergeManager implements IService {
return INSTANCE;
}
- public void submitMainTask(MergeTask mergeTask) {
+ public void submitMainTask(InplaceMergeTask mergeTask) {
mergeTaskPool.submit(mergeTask);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index 7372b44..d0cbbd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -46,7 +46,7 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-import static org.apache.iotdb.db.engine.merge.inplace.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask.MERGE_SUFFIX;
/**
* MergeResource manages files and caches of readers, writers, MeasurementSchemas and
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/LogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/LogAnalyzer.java
index 04ffb6d..0354513 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/LogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/LogAnalyzer.java
@@ -19,53 +19,35 @@
package org.apache.iotdb.db.engine.merge.squeeze.recover;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_ALL_TS_END;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_END;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_MERGE_END;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_MERGE_START;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_SEQ_FILES;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_START;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_TIMESERIES;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_UNSEQ_FILES;
+
+import static org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger.STR_ALL_TS_END;
+import static org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger.STR_SEQ_FILES;
+import static org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger.STR_UNSEQ_FILES;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.MetadataErrorException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* LogAnalyzer scans the "merge.log" file and recovers information such as files of last merge,
- * the last available positions of each file and how many timeseries and files have been merged.
- * An example of merging 1 seqFile and 1 unseqFile containing 3 series is:
+ * whether the new file is generated.
+ * An example of merging 1 seqFile and 1 unseqFile is:
* seqFiles
* server/0seq.tsfile
* unseqFiles
* server/0unseq.tsfile
* merge start
- * start root.mergeTest.device0.sensor0
- * server/0seq.tsfile.merge 338
- * end
- * start root.mergeTest.device0.sensor1
- * server/0seq.tsfile.merge 664
- * end
* all ts end
- * server/0seq.tsfile 145462
- * end
+ * server/0seq.tsfile.merge
* merge end
*/
public class LogAnalyzer {
@@ -76,23 +58,16 @@ public class LogAnalyzer {
private MergeResource resource;
private String taskName;
private File logFile;
- private String storageGroupName;
-
- private Map<File, Long> fileLastPositions = new HashMap<>();
- private Map<File, Long> tempFileLastPositions = new HashMap<>();
- private List<Path> mergedPaths = new ArrayList<>();
- private List<Path> unmergedPaths;
- private List<TsFileResource> unmergedFiles;
private String currLine;
+ private TsFileResource newResource;
private Status status;
- public LogAnalyzer(MergeResource resource, String taskName, File logFile, String storageGroupName) {
+ public LogAnalyzer(MergeResource resource, String taskName, File logFile) {
this.resource = resource;
this.taskName = taskName;
this.logFile = logFile;
- this.storageGroupName = storageGroupName;
}
/**
@@ -101,7 +76,7 @@ public class LogAnalyzer {
* @return a Status indicating the completed stage of the last merge.
* @throws IOException
*/
- public Status analyze() throws IOException, MetadataErrorException {
+ public Status analyze() throws IOException {
status = Status.NONE;
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(logFile))) {
currLine = bufferedReader.readLine();
@@ -110,15 +85,7 @@ public class LogAnalyzer {
analyzeUnseqFiles(bufferedReader);
- List<String> storageGroupPaths = MManager.getInstance().getPaths(storageGroupName + ".*");
- unmergedPaths = new ArrayList<>();
- for (String path : storageGroupPaths) {
- unmergedPaths.add(new Path(path));
- }
-
- analyzeMergedSeries(bufferedReader, unmergedPaths);
-
- analyzeMergedFiles(bufferedReader);
+ analyzeMergedFile(bufferedReader);
}
}
return status;
@@ -159,7 +126,7 @@ public class LogAnalyzer {
long startTime = System.currentTimeMillis();
List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
while ((currLine = bufferedReader.readLine()) != null) {
- if (currLine.equals(STR_TIMESERIES)) {
+ if (currLine.equals(STR_ALL_TS_END)) {
break;
}
Iterator<TsFileResource> iterator = resource.getUnseqFiles().iterator();
@@ -180,124 +147,28 @@ public class LogAnalyzer {
resource.setUnseqFiles(mergeUnseqFiles);
}
- private void analyzeMergedSeries(BufferedReader bufferedReader, List<Path> unmergedPaths) throws IOException {
- if (!STR_MERGE_START.equals(currLine)) {
- return;
- }
-
- status = Status.MERGE_START;
- for (TsFileResource seqFile : resource.getSeqFiles()) {
- File mergeFile = SystemFileFactory.INSTANCE.getFile(seqFile.getFile().getPath() + MergeTask.MERGE_SUFFIX);
- fileLastPositions.put(mergeFile, 0L);
- }
-
- List<Path> currTSList = new ArrayList<>();
- long startTime = System.currentTimeMillis();
- while ((currLine = bufferedReader.readLine()) != null) {
- if (STR_ALL_TS_END.equals(currLine)) {
- break;
- }
- if (currLine.contains(STR_START)) {
- // a TS starts to merge
- String[] splits = currLine.split(" ");
- for (int i = 1; i < splits.length; i ++) {
- currTSList.add(new Path(splits[i]));
- }
- tempFileLastPositions.clear();
- } else if (!currLine.contains(STR_END)) {
- // file position
- String[] splits = currLine.split(" ");
- File file = SystemFileFactory.INSTANCE.getFile(splits[0]);
- Long position = Long.parseLong(splits[1]);
- tempFileLastPositions.put(file, position);
- } else {
- // a TS ends merging
- unmergedPaths.removeAll(currTSList);
- for (Entry<File, Long> entry : tempFileLastPositions.entrySet()) {
- fileLastPositions.put(entry.getKey(), entry.getValue());
- }
- mergedPaths.addAll(currTSList);
- }
- }
- tempFileLastPositions = null;
- if (logger.isDebugEnabled()) {
- logger.debug("{} found {} series have already been merged after {}ms", taskName,
- mergedPaths.size(), (System.currentTimeMillis() - startTime));
- }
- }
- private void analyzeMergedFiles(BufferedReader bufferedReader) throws IOException {
+ private void analyzeMergedFile(BufferedReader bufferedReader) throws IOException {
if (!STR_ALL_TS_END.equals(currLine)) {
return;
}
- status = Status.ALL_TS_MERGED;
- unmergedFiles = resource.getSeqFiles();
+ currLine = bufferedReader.readLine();
+ if (currLine != null) {
+ status = Status.ALL_TS_MERGED;
+ File newFile = FSFactoryProducer.getFSFactory().getFile(currLine);
+ newResource = new TsFileResource(newFile);
+ newResource.deSerialize();
- File currFile = null;
- long startTime = System.currentTimeMillis();
- int mergedCnt = 0;
- while ((currLine = bufferedReader.readLine()) != null) {
- if (STR_MERGE_END.equals(currLine)) {
- status = Status.MERGE_END;
- break;
- }
- if (!currLine.contains(STR_END)) {
- String[] splits = currLine.split(" ");
- currFile = SystemFileFactory.INSTANCE.getFile(splits[0]);
- Long lastPost = Long.parseLong(splits[1]);
- fileLastPositions.put(currFile, lastPost);
- } else {
- fileLastPositions.remove(currFile);
- String seqFilePath = currFile.getAbsolutePath().replace(MergeTask.MERGE_SUFFIX, "");
- Iterator<TsFileResource> unmergedFileIter = unmergedFiles.iterator();
- while (unmergedFileIter.hasNext()) {
- TsFileResource seqFile = unmergedFileIter.next();
- if (seqFile.getFile().getAbsolutePath().equals(seqFilePath)) {
- mergedCnt ++;
- unmergedFileIter.remove();
- break;
- }
- }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} found files have already been merged into {}", taskName,
+ newFile.getPath());
}
}
- if (logger.isDebugEnabled()) {
- logger.debug("{} found {} files have already been merged after {}ms", taskName,
- mergedCnt, (System.currentTimeMillis() - startTime));
- }
- }
-
- public List<Path> getUnmergedPaths() {
- return unmergedPaths;
- }
-
- public void setUnmergedPaths(List<Path> unmergedPaths) {
- this.unmergedPaths = unmergedPaths;
- }
-
- public List<TsFileResource> getUnmergedFiles() {
- return unmergedFiles;
- }
-
- public void setUnmergedFiles(
- List<TsFileResource> unmergedFiles) {
- this.unmergedFiles = unmergedFiles;
- }
-
- public List<Path> getMergedPaths() {
- return mergedPaths;
- }
-
- public void setMergedPaths(List<Path> mergedPaths) {
- this.mergedPaths = mergedPaths;
- }
-
- public Map<File, Long> getFileLastPositions() {
- return fileLastPositions;
}
- public void setFileLastPositions(Map<File, Long> fileLastPositions) {
- this.fileLastPositions = fileLastPositions;
+ public TsFileResource getNewResource() {
+ return newResource;
}
public enum Status {
@@ -305,9 +176,7 @@ public class LogAnalyzer {
NONE,
// at least the files and timeseries to be merged are known
MERGE_START,
- // all the timeseries have been merged(merged chunks are generated)
- ALL_TS_MERGED,
- // all the merge files are merged with the origin files and the task is almost done
- MERGE_END
+ // all the timeseries have been merged(new file is generated)
+ ALL_TS_MERGED
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/MergeLogger.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/MergeLogger.java
index b707dec..1f99557 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/MergeLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/MergeLogger.java
@@ -34,16 +34,11 @@ import org.apache.iotdb.tsfile.read.common.Path;
*/
public class MergeLogger {
- public static final String MERGE_LOG_NAME = "merge.log";
+ public static final String MERGE_LOG_NAME = "merge.log.squeeze";
static final String STR_SEQ_FILES = "seqFiles";
static final String STR_UNSEQ_FILES = "unseqFiles";
- static final String STR_TIMESERIES = "timeseries";
- static final String STR_START = "start";
- static final String STR_END = "end";
static final String STR_ALL_TS_END = "all ts end";
- static final String STR_MERGE_START = "merge start";
- static final String STR_MERGE_END = "merge end";
private BufferedWriter logStream;
@@ -56,51 +51,12 @@ public class MergeLogger {
logStream.close();
}
- public void logTSStart(List<Path> paths) throws IOException {
- logStream.write(STR_START);
- for (Path path : paths) {
- logStream.write(" " + path.getFullPath());
- }
- logStream.newLine();
- logStream.flush();
- }
-
- public void logFilePosition(File file) throws IOException {
- logStream.write(String.format("%s %d", file.getAbsolutePath(), file.length()));
- logStream.newLine();
- logStream.flush();
- }
-
- public void logTSEnd() throws IOException {
- logStream.write(STR_END);
- logStream.newLine();
- logStream.flush();
- }
-
public void logAllTsEnd() throws IOException {
logStream.write(STR_ALL_TS_END);
logStream.newLine();
logStream.flush();
}
- public void logFileMergeStart(File file, long position) throws IOException {
- logStream.write(String.format("%s %d", file.getAbsolutePath(), position));
- logStream.newLine();
- logStream.flush();
- }
-
- public void logFileMergeEnd() throws IOException {
- logStream.write(STR_END);
- logStream.newLine();
- logStream.flush();
- }
-
- public void logMergeEnd() throws IOException {
- logStream.write(STR_MERGE_END);
- logStream.newLine();
- logStream.flush();
- }
-
public void logFiles(MergeResource resource) throws IOException {
logSeqFiles(resource.getSeqFiles());
logUnseqFiles(resource.getUnseqFiles());
@@ -126,12 +82,6 @@ public class MergeLogger {
logStream.flush();
}
- public void logMergeStart() throws IOException {
- logStream.write(STR_MERGE_START);
- logStream.newLine();
- logStream.flush();
- }
-
public void logNewFile(TsFileResource resource) throws IOException {
logStream.write(resource.getFile().getAbsolutePath());
logStream.newLine();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/MergeSeriesTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/MergeSeriesTask.java
index 89796f8..ed14c6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/MergeSeriesTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/MergeSeriesTask.java
@@ -131,7 +131,6 @@ class MergeSeriesTask {
logger.info("{} all series are merged after {}ms", taskName,
System.currentTimeMillis() - startTime);
}
- mergeLogger.logAllTsEnd();
return newResource;
}
@@ -186,15 +185,10 @@ class MergeSeriesTask {
private void createNewFileWriter() throws IOException {
// use the minimum version as the version of the new file
- long currFileVersion = Long.MAX_VALUE;
- File parent = null;
- for (TsFileResource seqFile : resource.getSeqFiles()) {
- long fileVersion =
- Long.parseLong(seqFile.getFile().getName()
- .replace(TSFILE_SUFFIX, "").split(TSFILE_SEPARATOR)[1]);
- currFileVersion = Math.min(currFileVersion, fileVersion);
- parent = parent == null ? seqFile.getFile().getParentFile() : parent;
- }
+ long currFileVersion =
+ Long.parseLong(
+ resource.getSeqFiles().get(0).getFile().getName().replace(TSFILE_SUFFIX, "").split(TSFILE_SEPARATOR)[1]);
+ File parent = resource.getSeqFiles().get(0).getFile().getParentFile();
File newFile = FSFactoryProducer.getFSFactory().getFile(parent,
System.currentTimeMillis() + TSFILE_SEPARATOR + currFileVersion + TSFILE_SUFFIX + MERGE_SUFFIX);
newFileWriter = new RestorableTsFileIOWriter(newFile);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/RecoverSqueezeMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/RecoverSqueezeMergeTask.java
new file mode 100644
index 0000000..c32ffb6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/RecoverSqueezeMergeTask.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.squeeze.task;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.merge.MergeCallback;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.squeeze.recover.LogAnalyzer;
+import org.apache.iotdb.db.engine.merge.squeeze.recover.LogAnalyzer.Status;
+import org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RecoverMergeTask is an extension of MergeTask, which resumes the last merge progress by
+ * scanning merge.log using LogAnalyzer and continue the unfinished merge.
+ */
+public class RecoverSqueezeMergeTask extends SqueezeMergeTask {
+
+ private static final Logger logger = LoggerFactory.getLogger(RecoverSqueezeMergeTask.class);
+
+ public RecoverSqueezeMergeTask(List<TsFileResource> seqFiles,
+ List<TsFileResource> unseqFiles, String storageGroupSysDir,
+ MergeCallback callback, String taskName, String storageGroupName) {
+ super(new MergeResource(seqFiles, unseqFiles), storageGroupSysDir, callback, taskName,
+ 1, storageGroupName);
+ }
+
+ public void recoverMerge() throws IOException {
+ File logFile = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME);
+ if (!logFile.exists()) {
+ logger.info("{} no merge.log, merge recovery ends", taskName);
+ return;
+ }
+ long startTime = System.currentTimeMillis();
+
+ LogAnalyzer analyzer = new LogAnalyzer(resource, taskName, logFile);
+ Status status = analyzer.analyze();
+ if (logger.isInfoEnabled()) {
+ logger.info("{} merge recovery status determined: {} after {}ms", taskName, status,
+ (System.currentTimeMillis() - startTime));
+ }
+ switch (status) {
+ case NONE:
+ logFile.delete();
+ break;
+ case MERGE_START:
+ removeMergedFile();
+ // set the files to empty to let the StorageGroupProcessor do a clean up
+ resource.setSeqFiles(Collections.emptyList());
+ resource.setUnseqFiles(Collections.emptyList());
+ cleanUp(true);
+ break;
+ case ALL_TS_MERGED:
+ newResource = analyzer.getNewResource();
+ cleanUp(true);
+ break;
+ default:
+ throw new UnsupportedOperationException(taskName + " found unrecognized status " + status);
+ }
+ if (logger.isInfoEnabled()) {
+ logger.info("{} merge recovery ends after {}ms", taskName,
+ (System.currentTimeMillis() - startTime));
+ }
+ }
+
+ private void removeMergedFile() {
+ File sgDir =
+ FSFactoryProducer.getFSFactory().getFile(resource.getSeqFiles().get(0).getFile().getParent());
+ File[] mergeFiles = sgDir.listFiles(file -> file.getName().endsWith(MERGE_SUFFIX));
+ if (mergeFiles != null) {
+ for (File file : mergeFiles) {
+ file.delete();
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/SqueezeMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/SqueezeMergeTask.java
index 4e6ac3d..96c4861 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/SqueezeMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/SqueezeMergeTask.java
@@ -23,20 +23,20 @@ import org.slf4j.LoggerFactory;
public class SqueezeMergeTask implements Callable<Void> {
- static final String MERGE_SUFFIX = ".merge";
+ static final String MERGE_SUFFIX = ".merge.squeeze";
private static final Logger logger = LoggerFactory.getLogger(SqueezeMergeTask.class);
- private MergeResource resource;
- private String storageGroupSysDir;
- private String storageGroupName;
+ MergeResource resource;
+ String storageGroupSysDir;
+ String storageGroupName;
private MergeLogger mergeLogger;
private MergeContext mergeContext = new MergeContext();
- private MergeCallback callback;
- private int concurrentMergeSeriesNum;
- private String taskName;
+ MergeCallback callback;
+ int concurrentMergeSeriesNum;
+ String taskName;
- private TsFileResource newResource;
+ TsFileResource newResource;
public SqueezeMergeTask(MergeResource mergeResource, String storageGroupSysDir, MergeCallback callback,
String taskName, int concurrentMergeSeriesNum, String storageGroupName) {
@@ -87,8 +87,6 @@ public class SqueezeMergeTask implements Callable<Void> {
unmergedSeries.add(new Path(path));
}
- mergeLogger.logMergeStart();
-
MergeSeriesTask mergeChunkTask = new MergeSeriesTask(mergeContext, taskName, mergeLogger, resource
,unmergedSeries, concurrentMergeSeriesNum);
newResource = mergeChunkTask.mergeSeries();
@@ -98,17 +96,16 @@ public class SqueezeMergeTask implements Callable<Void> {
double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0;
double byteRate = totalFileSize / elapsedTime / 1024 / 1024;
double seriesRate = unmergedSeries.size() / elapsedTime;
- double chunkRate = mergeContext.getTotalChunkWritten() / elapsedTime;
double fileRate =
(resource.getSeqFiles().size() + resource.getUnseqFiles().size()) / elapsedTime;
double ptRate = mergeContext.getTotalPointWritten() / elapsedTime;
- logger.info("{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
+ logger.info("{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, "
+ "fileRate: {}/s, ptRate: {}/s",
- taskName, elapsedTime, byteRate, seriesRate, chunkRate, fileRate, ptRate);
+ taskName, elapsedTime, byteRate, seriesRate, fileRate, ptRate);
}
}
- private void cleanUp(boolean executeCallback) throws IOException {
+ void cleanUp(boolean executeCallback) throws IOException {
logger.info("{} is cleaning up", taskName);
resource.clear();
@@ -118,11 +115,6 @@ public class SqueezeMergeTask implements Callable<Void> {
mergeLogger.close();
}
- for (TsFileResource seqFile : resource.getSeqFiles()) {
- File mergeFile = FSFactoryProducer.getFSFactory().getFile(seqFile.getFile().getPath() + MERGE_SUFFIX);
- mergeFile.delete();
- }
-
File logFile = FSFactoryProducer.getFSFactory().getFile(storageGroupSysDir,
MergeLogger.MERGE_LOG_NAME);
if (executeCallback) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 64aae8e..3c4f982 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
-import static org.apache.iotdb.db.engine.merge.inplace.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask.MERGE_SUFFIX;
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SEPARATOR;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
@@ -46,8 +46,8 @@ import org.apache.iotdb.db.engine.merge.IMergeFileSelector;
import org.apache.iotdb.db.engine.merge.inplace.selector.MaxFileMergeFileSelector;
import org.apache.iotdb.db.engine.merge.inplace.selector.MaxSeriesMergeFileSelector;
import org.apache.iotdb.db.engine.merge.inplace.selector.MergeFileStrategy;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
-import org.apache.iotdb.db.engine.merge.inplace.task.RecoverMergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.RecoverInplaceMergeTask;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -223,7 +223,7 @@ public class StorageGroupProcessor {
if (mergingMods.exists()) {
mergingModification = new ModificationFile(mergingMods.getPath());
}
- RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, unseqTsFiles,
+ RecoverInplaceMergeTask recoverMergeTask = new RecoverInplaceMergeTask(seqTsFiles, unseqTsFiles,
storageGroupSysDir.getPath(), this::mergeEndAction, taskName,
IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
@@ -878,7 +878,7 @@ public class StorageGroupProcessor {
// cached during selection
mergeResource.setCacheDeviceMeta(true);
- MergeTask mergeTask = new MergeTask(mergeResource, storageGroupSysDir.getPath(),
+ InplaceMergeTask mergeTask = new InplaceMergeTask(mergeResource, storageGroupSysDir.getPath(),
this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(), storageGroupName);
mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME);
MergeManager.getINSTANCE().submitMainTask(mergeTask);
@@ -962,7 +962,7 @@ public class StorageGroupProcessor {
}
// block new queries and insertions to prevent the seqFiles from changing
writeLock();
- mergeLock.writeLock().lock();;
+ mergeLock.writeLock().lock();
try {
removeUnseqFiles(unseqFiles);
// insert the new file into a proper place
@@ -1025,8 +1025,21 @@ public class StorageGroupProcessor {
if (unseqFiles.isEmpty()) {
// merge runtime exception arose, just end this merge
- isMerging = false;
- logger.info("{} a merge task abnormally ends", storageGroupName);
+ mergeLock.writeLock().lock();
+ try {
+ if (mergingModification != null) {
+ logger.debug("{} is updating the merged file's modification file", storageGroupName);
+ mergingModification.remove();
+ mergingModification = null;
+ }
+ } catch (IOException e) {
+ logger.error("{} cannot remove merge modifications after merge abnormally ends",
+ storageGroupName, e);
+ } finally {
+ isMerging = false;
+ logger.info("{} a merge task abnormally ends", storageGroupName);
+ mergeLock.writeLock().unlock();
+ }
return;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
index 0c525c8..bfb1218 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
@@ -29,7 +29,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -58,8 +58,8 @@ public class MergeLogTest extends MergeTest {
@Test
public void testMergeLog() throws Exception {
- MergeTask mergeTask =
- new MergeTask(new MergeResource(seqResources.subList(0, 1), unseqResources.subList(0, 1)),
+ InplaceMergeTask mergeTask =
+ new InplaceMergeTask(new MergeResource(seqResources.subList(0, 1), unseqResources.subList(0, 1)),
tempSGDir.getPath(), this::testCallBack, "test", false, 1, MERGE_TEST_SG);
mergeTask.call();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
index 67082fd..eae1982 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -43,8 +43,8 @@ public class MergePerfTest extends MergeTest{
timeConsumption = System.currentTimeMillis();
MergeResource resource = new MergeResource(seqResources, unseqResources);
resource.setCacheDeviceMeta(true);
- MergeTask mergeTask =
- new MergeTask(resource, tempSGDir.getPath(), (k, v
+ InplaceMergeTask mergeTask =
+ new InplaceMergeTask(resource, tempSGDir.getPath(), (k, v
, l) -> {}, "test", fullMerge, 100, MERGE_TEST_SG);
mergeTask.call();
timeConsumption = System.currentTimeMillis() - timeConsumption;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index dc9d6e1..0b2b951 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -27,7 +27,7 @@ import java.util.Collections;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -60,9 +60,9 @@ public class MergeTaskTest extends MergeTest {
@Test
public void testMerge() throws Exception {
- MergeTask mergeTask =
- new MergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v
- , l) -> {}, "test", false, 1, MERGE_TEST_SG);
+ InplaceMergeTask mergeTask =
+ new InplaceMergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v
+ , l, n) -> {}, "test", false, 1, MERGE_TEST_SG);
mergeTask.call();
QueryContext context = new QueryContext();
@@ -81,8 +81,9 @@ public class MergeTaskTest extends MergeTest {
@Test
public void testFullMerge() throws Exception {
- MergeTask mergeTask =
- new MergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v, l) -> {}, "test",
+ InplaceMergeTask mergeTask =
+ new InplaceMergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath()
+ , (k, v, l, n) -> {}, "test",
true, 1, MERGE_TEST_SG);
mergeTask.call();
@@ -103,8 +104,9 @@ public class MergeTaskTest extends MergeTest {
@Test
public void testChunkNumThreshold() throws Exception {
IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(Integer.MAX_VALUE);
- MergeTask mergeTask =
- new MergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v, l) -> {}, "test",
+ InplaceMergeTask mergeTask =
+ new InplaceMergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath()
+ , (k, v, l, n) -> {}, "test",
false, 1, MERGE_TEST_SG);
mergeTask.call();
@@ -124,9 +126,9 @@ public class MergeTaskTest extends MergeTest {
@Test
public void testPartialMerge1() throws Exception {
- MergeTask mergeTask =
- new MergeTask(new MergeResource(seqResources, unseqResources.subList(0, 1)), tempSGDir.getPath(),
- (k, v, l) -> {}, "test", false, 1, MERGE_TEST_SG);
+ InplaceMergeTask mergeTask =
+ new InplaceMergeTask(new MergeResource(seqResources, unseqResources.subList(0, 1)), tempSGDir.getPath(),
+ (k, v, l, n) -> {}, "test", false, 1, MERGE_TEST_SG);
mergeTask.call();
QueryContext context = new QueryContext();
@@ -149,9 +151,9 @@ public class MergeTaskTest extends MergeTest {
@Test
public void testPartialMerge2() throws Exception {
- MergeTask mergeTask =
- new MergeTask(new MergeResource(seqResources, unseqResources.subList(5, 6)), tempSGDir.getPath(),
- (k, v, l) -> {}, "test", false, 1, MERGE_TEST_SG);
+ InplaceMergeTask mergeTask =
+ new InplaceMergeTask(new MergeResource(seqResources, unseqResources.subList(5, 6)), tempSGDir.getPath(),
+ (k, v, l, n) -> {}, "test", false, 1, MERGE_TEST_SG);
mergeTask.call();
QueryContext context = new QueryContext();
@@ -170,9 +172,9 @@ public class MergeTaskTest extends MergeTest {
@Test
public void testPartialMerge3() throws Exception {
- MergeTask mergeTask =
- new MergeTask(new MergeResource(seqResources, unseqResources.subList(0, 5)), tempSGDir.getPath(),
- (k, v, l) -> {}, "test", false, 1, MERGE_TEST_SG);
+ InplaceMergeTask mergeTask =
+ new InplaceMergeTask(new MergeResource(seqResources, unseqResources.subList(0, 5)), tempSGDir.getPath(),
+ (k, v, l, n) -> {}, "test", false, 1, MERGE_TEST_SG);
mergeTask.call();
QueryContext context = new QueryContext();
@@ -200,9 +202,9 @@ public class MergeTaskTest extends MergeTest {
seqResources.get(0).getModFile().close();
- MergeTask mergeTask =
- new MergeTask(new MergeResource(seqResources, unseqResources.subList(0, 1)), tempSGDir.getPath(),
- (k, v, l) -> {
+ InplaceMergeTask mergeTask =
+ new InplaceMergeTask(new MergeResource(seqResources, unseqResources.subList(0, 1)), tempSGDir.getPath(),
+ (k, v, l, n) -> {
try {
seqResources.get(0).removeModFile();
} catch (IOException e) {