You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/17 03:36:13 UTC
[incubator-iotdb] branch dev_merge updated: refactor
RecoverMergeTask
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_merge by this push:
new b5b7400 refactor RecoverMergeTask
b5b7400 is described below
commit b5b7400d00b21aa6a94b406fe572f2be2e3698ab
Author: 江天 <jt...@163.com>
AuthorDate: Wed Jul 17 11:33:49 2019 +0800
refactor RecoverMergeTask
---
.../LogAnalyzer.java} | 188 ++++------------
.../iotdb/db/engine/merge/task/MergeTask.java | 1 +
.../db/engine/merge/task/RecoverMergeTask.java | 239 ++++-----------------
3 files changed, 87 insertions(+), 341 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
similarity index 52%
copy from iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
copy to iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
index ebcc425..5c7a004 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
@@ -17,175 +17,49 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge.task;
+package org.apache.iotdb.db.engine.merge.recover;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
-import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
-import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RecoverMergeTask extends MergeTask {
+public class LogAnalyzer {
- private static final Logger logger = LoggerFactory.getLogger(RecoverMergeTask.class);
+ private static final Logger logger = LoggerFactory.getLogger(LogAnalyzer.class);
- private String currLine;
- private List<TsFileResource> mergeSeqFiles = new ArrayList<>();
- private List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
+ private MergeResource resource;
+ private String taskName;
+ private File logFile;
private Map<File, Long> fileLastPositions = new HashMap<>();
private Map<File, Long> tempFileLastPositions = new HashMap<>();
- private List<Path> unmergedPaths;
private List<Path> mergedPaths = new ArrayList<>();
+ private List<Path> unmergedPaths;
private List<TsFileResource> unmergedFiles;
+ private String currLine;
- public RecoverMergeTask(String storageGroupDir, MergeCallback callback, String taskName,
- boolean fullMerge) throws IOException {
- super(null, null, storageGroupDir, callback, taskName, fullMerge);
- }
-
- public void recoverMerge(boolean continueMerge) throws IOException {
- File logFile = new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME);
- if (!logFile.exists()) {
- logger.info("{} no merge.log, merge recovery ends", taskName);
- return;
- }
- mergeSeqFiles = new ArrayList<>();
- mergeUnseqFiles = new ArrayList<>();
- long startTime = System.currentTimeMillis();
-
- Status status = determineStatus(logFile);
- if (logger.isInfoEnabled()) {
- logger.info("{} merge recovery status determined: {} after {}ms", taskName, status,
- (System.currentTimeMillis() - startTime));
- }
- switch (status) {
- case NONE:
- logFile.delete();
- break;
- case FILES_LOGGED:
- if (continueMerge) {
- mergeLogger = new MergeLogger(storageGroupDir);
- truncateFiles();
- recoverChunkCounts();
- MergeSeriesTask mergeSeriesTask = new MergeSeriesTask(mergedChunkCnt, unmergedChunkCnt,
- unmergedChunkStartTimes, taskName, mergeLogger, resource, fullMerge, unmergedPaths);
- mergeSeriesTask.mergeSeries();
-
- MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
- unmergedChunkStartTimes, mergeLogger, resource, resource.getSeqFiles());
- mergeFileTask.mergeFiles();
- }
- cleanUp(continueMerge);
- break;
- case ALL_TS_MERGED:
- if (continueMerge) {
- mergeLogger = new MergeLogger(storageGroupDir);
- truncateFiles();
- recoverChunkCounts();
- resource.setSeqFiles(unmergedFiles);
- MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
- unmergedChunkStartTimes, mergeLogger, resource, unmergedFiles);
- mergeFileTask.mergeFiles();
- } else {
- // NOTICE: although some of the seqFiles may have been truncated, later TsFile recovery
- // will recover them, so they are not a concern here
- truncateFiles();
- }
- cleanUp(continueMerge);
- break;
- case MERGE_END:
- cleanUp(continueMerge);
- break;
- }
- if (logger.isInfoEnabled()) {
- logger.info("{} merge recovery ends after {}ms", taskName,
- (System.currentTimeMillis() - startTime));
- }
- }
-
-
- // scan metadata to compute how many chunks are merged/unmerged so at last we can decide to
- // move the merged chunks or the unmerged chunks
- private void recoverChunkCounts() throws IOException {
- logger.debug("{} recovering chunk counts", taskName);
- for (TsFileResource tsFileResource : resource.getSeqFiles()) {
- RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(tsFileResource);
- mergeFileWriter.makeMetadataVisible();
- unmergedChunkStartTimes.put(tsFileResource, new HashMap<>());
- for(Path path : mergedPaths) {
- recoverChunkCounts(path, tsFileResource, mergeFileWriter);
- }
- }
- }
-
- private void recoverChunkCounts(Path path, TsFileResource tsFileResource,
- RestorableTsFileIOWriter mergeFileWriter) throws IOException {
- unmergedChunkStartTimes.get(tsFileResource).put(path, new ArrayList<>());
-
- List<ChunkMetaData> seqFileChunks = resource.queryChunkMetadata(path, tsFileResource);
- List<ChunkMetaData> mergeFileChunks =
- mergeFileWriter.getVisibleMetadataList(path.getDevice(), path.getMeasurement(), null);
- mergedChunkCnt.compute(tsFileResource, (k, v) -> v == null ? mergeFileChunks.size() :
- v + mergeFileChunks.size());
- int seqIndex = 0;
- int mergeIndex = 0;
- int unmergedCnt = 0;
- while (seqIndex < seqFileChunks.size() && mergeIndex < mergeFileChunks.size()) {
- ChunkMetaData seqChunk = seqFileChunks.get(seqIndex);
- ChunkMetaData mergedChunk = mergeFileChunks.get(mergeIndex);
- if (seqChunk.getStartTime() < mergedChunk.getStartTime()) {
- // this seqChunk is unmerged
- unmergedCnt ++;
- seqIndex ++;
- unmergedChunkStartTimes.get(tsFileResource).get(path).add(seqChunk.getStartTime());
- } else if (mergedChunk.getStartTime() <= seqChunk.getStartTime() &&
- seqChunk.getStartTime() <= mergedChunk.getEndTime()) {
- // this seqChunk is merged
- seqIndex ++;
- } else {
- // seqChunk.startTime > mergeChunk.endTime, find next mergedChunk that may cover the
- // seqChunk
- mergeIndex ++;
- }
- }
- int finalUnmergedCnt = unmergedCnt;
- unmergedChunkCnt.compute(tsFileResource, (k, v) -> v == null ? finalUnmergedCnt :
- v + finalUnmergedCnt);
- }
-
- private void truncateFiles() throws IOException {
- logger.debug("{} truncating {} files", taskName, fileLastPositions.size());
- for (Entry<File, Long> entry : fileLastPositions.entrySet()) {
- File file = entry.getKey();
- Long lastPosition = entry.getValue();
- if (file.exists() && file.length() != lastPosition) {
- try (FileInputStream fileInputStream = new FileInputStream(file)) {
- FileChannel channel = fileInputStream.getChannel();
- channel.truncate(lastPosition);
- channel.close();
- }
- }
- }
+ public LogAnalyzer(MergeResource resource, String taskName, File logFile) {
+ this.resource = resource;
+ this.taskName = taskName;
+ this.logFile = logFile;
}
- private Status determineStatus(File logFile) throws IOException {
+ public Status analyze() throws IOException {
Status status = Status.NONE;
try (BufferedReader bufferedReader =
new BufferedReader(new FileReader(logFile))) {
@@ -199,8 +73,6 @@ public class RecoverMergeTask extends MergeTask {
}
if (currLine.equals(MergeLogger.STR_MERGE_START)) {
status = Status.FILES_LOGGED;
- resource.setSeqFiles(mergeSeqFiles);
- resource.setUnseqFiles(mergeUnseqFiles);
for (TsFileResource seqFile : resource.getSeqFiles()) {
File mergeFile = new File(seqFile.getFile().getPath() + MergeTask.MERGE_SUFFIX);
fileLastPositions.put(mergeFile, 0L);
@@ -223,6 +95,7 @@ public class RecoverMergeTask extends MergeTask {
private void analyzeSeqFiles(BufferedReader bufferedReader) throws IOException {
long startTime = System.currentTimeMillis();
+ List<TsFileResource> mergeSeqFiles = new ArrayList<>();
while ((currLine = bufferedReader.readLine()) != null) {
if (currLine.equals(MergeLogger.STR_UNSEQ_FILES)) {
break;
@@ -241,10 +114,12 @@ public class RecoverMergeTask extends MergeTask {
logger.debug("{} found {} seq files after {}ms", taskName, mergeSeqFiles.size(),
(System.currentTimeMillis() - startTime));
}
+ resource.setSeqFiles(mergeSeqFiles);
}
private void analyzeUnseqFiles(BufferedReader bufferedReader) throws IOException {
long startTime = System.currentTimeMillis();
+ List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
while ((currLine = bufferedReader.readLine()) != null) {
if (currLine.equals(MergeLogger.STR_MERGE_START)) {
break;
@@ -263,6 +138,7 @@ public class RecoverMergeTask extends MergeTask {
logger.debug("{} found {} seq files after {}ms", taskName, mergeUnseqFiles.size(),
(System.currentTimeMillis() - startTime));
}
+ resource.setUnseqFiles(mergeUnseqFiles);
}
private void analyzeMergedSeries(BufferedReader bufferedReader, List<Path> unmergedPaths) throws IOException {
@@ -294,7 +170,7 @@ public class RecoverMergeTask extends MergeTask {
}
if (logger.isDebugEnabled()) {
logger.debug("{} found {} series have already been merged after {}ms", taskName,
- mergeSeqFiles.size(), (System.currentTimeMillis() - startTime));
+ mergedPaths.size(), (System.currentTimeMillis() - startTime));
}
}
@@ -304,7 +180,7 @@ public class RecoverMergeTask extends MergeTask {
int mergedCnt = 0;
while ((currLine = bufferedReader.readLine()) != null) {
if (currLine.equals(MergeLogger.STR_MERGE_END)) {
- break;
+ break;
}
if (currLine.contains(MergeLogger.STR_START)) {
String[] splits = currLine.split(" ");
@@ -327,11 +203,27 @@ public class RecoverMergeTask extends MergeTask {
}
if (logger.isDebugEnabled()) {
logger.debug("{} found {} files have already been merged after {}ms", taskName,
- mergedCnt, (System.currentTimeMillis() - startTime));
+ mergedCnt, (System.currentTimeMillis() - startTime));
}
}
- enum Status {
+ public enum Status {
NONE, FILES_LOGGED, ALL_TS_MERGED, MERGE_END
}
+
+ public List<Path> getUnmergedPaths() {
+ return unmergedPaths;
+ }
+
+ public List<TsFileResource> getUnmergedFiles() {
+ return unmergedFiles;
+ }
+
+ public List<Path> getMergedPaths() {
+ return mergedPaths;
+ }
+
+ public Map<File, Long> getFileLastPositions() {
+ return fileLastPositions;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 43e3d0e..64dd4da 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -134,6 +134,7 @@ public class MergeTask implements Callable<Void> {
File logFile = new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME);
if (executeCallback) {
+ // make sure merge.log is not deleted before unseqFiles are cleared
callback.call(resource.getSeqFiles(), resource.getUnseqFiles(), logFile);
} else {
logFile.delete();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index ebcc425..809be34 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -19,20 +19,17 @@
package org.apache.iotdb.db.engine.merge.task;
-import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileReader;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
+import org.apache.iotdb.db.engine.merge.recover.LogAnalyzer;
+import org.apache.iotdb.db.engine.merge.recover.LogAnalyzer.Status;
import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
-import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -44,16 +41,8 @@ public class RecoverMergeTask extends MergeTask {
private static final Logger logger = LoggerFactory.getLogger(RecoverMergeTask.class);
- private String currLine;
- private List<TsFileResource> mergeSeqFiles = new ArrayList<>();
- private List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
+ private LogAnalyzer analyzer;
- private Map<File, Long> fileLastPositions = new HashMap<>();
- private Map<File, Long> tempFileLastPositions = new HashMap<>();
-
- private List<Path> unmergedPaths;
- private List<Path> mergedPaths = new ArrayList<>();
- private List<TsFileResource> unmergedFiles;
public RecoverMergeTask(String storageGroupDir, MergeCallback callback, String taskName,
boolean fullMerge) throws IOException {
@@ -66,11 +55,10 @@ public class RecoverMergeTask extends MergeTask {
logger.info("{} no merge.log, merge recovery ends", taskName);
return;
}
- mergeSeqFiles = new ArrayList<>();
- mergeUnseqFiles = new ArrayList<>();
long startTime = System.currentTimeMillis();
- Status status = determineStatus(logFile);
+ analyzer = new LogAnalyzer(resource, taskName, logFile);
+ Status status = analyzer.analyze();
if (logger.isInfoEnabled()) {
logger.info("{} merge recovery status determined: {} after {}ms", taskName, status,
(System.currentTimeMillis() - startTime));
@@ -80,39 +68,16 @@ public class RecoverMergeTask extends MergeTask {
logFile.delete();
break;
case FILES_LOGGED:
- if (continueMerge) {
- mergeLogger = new MergeLogger(storageGroupDir);
- truncateFiles();
- recoverChunkCounts();
- MergeSeriesTask mergeSeriesTask = new MergeSeriesTask(mergedChunkCnt, unmergedChunkCnt,
- unmergedChunkStartTimes, taskName, mergeLogger, resource, fullMerge, unmergedPaths);
- mergeSeriesTask.mergeSeries();
-
- MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
- unmergedChunkStartTimes, mergeLogger, resource, resource.getSeqFiles());
- mergeFileTask.mergeFiles();
- }
- cleanUp(continueMerge);
+ resumeAfterFilesLogged(continueMerge);
break;
case ALL_TS_MERGED:
- if (continueMerge) {
- mergeLogger = new MergeLogger(storageGroupDir);
- truncateFiles();
- recoverChunkCounts();
- resource.setSeqFiles(unmergedFiles);
- MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
- unmergedChunkStartTimes, mergeLogger, resource, unmergedFiles);
- mergeFileTask.mergeFiles();
- } else {
- // NOTICE: although some of the seqFiles may have been truncated, later TsFile recovery
- // will recover them, so they are not a concern here
- truncateFiles();
- }
- cleanUp(continueMerge);
+ resumeAfterAllTsMerged(continueMerge);
break;
case MERGE_END:
cleanUp(continueMerge);
break;
+ default:
+ throw new UnsupportedOperationException(taskName + " found unrecognized status " + status);
}
if (logger.isInfoEnabled()) {
logger.info("{} merge recovery ends after {}ms", taskName,
@@ -120,6 +85,40 @@ public class RecoverMergeTask extends MergeTask {
}
}
+ private void resumeAfterFilesLogged(boolean continueMerge) throws IOException {
+ if (continueMerge) {
+ resumeMerge();
+ MergeSeriesTask mergeSeriesTask = new MergeSeriesTask(mergedChunkCnt, unmergedChunkCnt,
+ unmergedChunkStartTimes, taskName, mergeLogger, resource, fullMerge, analyzer.getUnmergedPaths());
+ mergeSeriesTask.mergeSeries();
+
+ MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
+ unmergedChunkStartTimes, mergeLogger, resource, resource.getSeqFiles());
+ mergeFileTask.mergeFiles();
+ }
+ cleanUp(continueMerge);
+ }
+
+ private void resumeAfterAllTsMerged(boolean continueMerge) throws IOException {
+ if (continueMerge) {
+ resumeMerge();
+ MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
+ unmergedChunkStartTimes, mergeLogger, resource, analyzer.getUnmergedFiles());
+ mergeFileTask.mergeFiles();
+ } else {
+ // NOTICE: although some of the seqFiles may have been truncated, later TsFile recovery
+ // will recover them, so they are not a concern here
+ truncateFiles();
+ }
+ cleanUp(continueMerge);
+ }
+
+ private void resumeMerge() throws IOException {
+ mergeLogger = new MergeLogger(storageGroupDir);
+ truncateFiles();
+ recoverChunkCounts();
+ }
+
// scan metadata to compute how many chunks are merged/unmerged so at last we can decide to
// move the merged chunks or the unmerged chunks
@@ -129,7 +128,7 @@ public class RecoverMergeTask extends MergeTask {
RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(tsFileResource);
mergeFileWriter.makeMetadataVisible();
unmergedChunkStartTimes.put(tsFileResource, new HashMap<>());
- for(Path path : mergedPaths) {
+ for(Path path : analyzer.getMergedPaths()) {
recoverChunkCounts(path, tsFileResource, mergeFileWriter);
}
}
@@ -171,8 +170,8 @@ public class RecoverMergeTask extends MergeTask {
}
private void truncateFiles() throws IOException {
- logger.debug("{} truncating {} files", taskName, fileLastPositions.size());
- for (Entry<File, Long> entry : fileLastPositions.entrySet()) {
+ logger.debug("{} truncating {} files", taskName, analyzer.getFileLastPositions().size());
+ for (Entry<File, Long> entry : analyzer.getFileLastPositions().entrySet()) {
File file = entry.getKey();
Long lastPosition = entry.getValue();
if (file.exists() && file.length() != lastPosition) {
@@ -185,153 +184,7 @@ public class RecoverMergeTask extends MergeTask {
}
}
- private Status determineStatus(File logFile) throws IOException {
- Status status = Status.NONE;
- try (BufferedReader bufferedReader =
- new BufferedReader(new FileReader(logFile))) {
- currLine = bufferedReader.readLine();
- if (currLine != null) {
- if (currLine.equals(MergeLogger.STR_SEQ_FILES)) {
- analyzeSeqFiles(bufferedReader);
- }
- if (currLine.equals(MergeLogger.STR_UNSEQ_FILES)) {
- analyzeUnseqFiles(bufferedReader);
- }
- if (currLine.equals(MergeLogger.STR_MERGE_START)) {
- status = Status.FILES_LOGGED;
- resource.setSeqFiles(mergeSeqFiles);
- resource.setUnseqFiles(mergeUnseqFiles);
- for (TsFileResource seqFile : resource.getSeqFiles()) {
- File mergeFile = new File(seqFile.getFile().getPath() + MergeTask.MERGE_SUFFIX);
- fileLastPositions.put(mergeFile, 0L);
- }
- unmergedPaths = MergeUtils.collectPaths(resource);
- analyzeMergedSeries(bufferedReader, unmergedPaths);
- }
- if (currLine.equals(MergeLogger.STR_ALL_TS_END)) {
- status = Status.ALL_TS_MERGED;
- unmergedFiles = resource.getSeqFiles();
- analyzeMergedFiles(bufferedReader);
- }
- if (currLine.equals(MergeLogger.STR_MERGE_END)) {
- status = Status.MERGE_END;
- }
- }
- }
- return status;
- }
- private void analyzeSeqFiles(BufferedReader bufferedReader) throws IOException {
- long startTime = System.currentTimeMillis();
- while ((currLine = bufferedReader.readLine()) != null) {
- if (currLine.equals(MergeLogger.STR_UNSEQ_FILES)) {
- break;
- }
- Iterator<TsFileResource> iterator = resource.getSeqFiles().iterator();
- while (iterator.hasNext()) {
- TsFileResource seqFile = iterator.next();
- if (seqFile.getFile().getAbsolutePath().equals(currLine)) {
- mergeSeqFiles.add(seqFile);
- iterator.remove();
- break;
- }
- }
- }
- if (logger.isDebugEnabled()) {
- logger.debug("{} found {} seq files after {}ms", taskName, mergeSeqFiles.size(),
- (System.currentTimeMillis() - startTime));
- }
- }
- private void analyzeUnseqFiles(BufferedReader bufferedReader) throws IOException {
- long startTime = System.currentTimeMillis();
- while ((currLine = bufferedReader.readLine()) != null) {
- if (currLine.equals(MergeLogger.STR_MERGE_START)) {
- break;
- }
- Iterator<TsFileResource> iterator = resource.getUnseqFiles().iterator();
- while (iterator.hasNext()) {
- TsFileResource unseqFile = iterator.next();
- if (unseqFile.getFile().getAbsolutePath().equals(currLine)) {
- mergeUnseqFiles.add(unseqFile);
- iterator.remove();
- break;
- }
- }
- }
- if (logger.isDebugEnabled()) {
- logger.debug("{} found {} seq files after {}ms", taskName, mergeUnseqFiles.size(),
- (System.currentTimeMillis() - startTime));
- }
- }
- private void analyzeMergedSeries(BufferedReader bufferedReader, List<Path> unmergedPaths) throws IOException {
- Path currTS = null;
- long startTime = System.currentTimeMillis();
- while ((currLine = bufferedReader.readLine()) != null) {
- if (currLine.equals(MergeLogger.STR_ALL_TS_END)) {
- break;
- }
- if (currLine.contains(MergeLogger.STR_START)) {
- // a TS starts to merge
- String[] splits = currLine.split(" ");
- currTS = new Path(splits[0]);
- tempFileLastPositions.clear();
- } else if (!currLine.contains(MergeLogger.STR_END)) {
- // file position
- String[] splits = currLine.split(" ");
- File file = new File(splits[0]);
- Long position = Long.parseLong(splits[1]);
- tempFileLastPositions.put(file, position);
- } else {
- // a TS ends merging
- unmergedPaths.remove(currTS);
- for (Entry<File, Long> entry : tempFileLastPositions.entrySet()) {
- fileLastPositions.put(entry.getKey(), entry.getValue());
- }
- mergedPaths.add(currTS);
- }
- }
- if (logger.isDebugEnabled()) {
- logger.debug("{} found {} series have already been merged after {}ms", taskName,
- mergeSeqFiles.size(), (System.currentTimeMillis() - startTime));
- }
- }
-
- private void analyzeMergedFiles(BufferedReader bufferedReader) throws IOException {
- File currFile = null;
- long startTime = System.currentTimeMillis();
- int mergedCnt = 0;
- while ((currLine = bufferedReader.readLine()) != null) {
- if (currLine.equals(MergeLogger.STR_MERGE_END)) {
- break;
- }
- if (currLine.contains(MergeLogger.STR_START)) {
- String[] splits = currLine.split(" ");
- currFile = new File(splits[0]);
- Long lastPost = Long.parseLong(splits[2]);
- fileLastPositions.put(currFile, lastPost);
- } else if (currLine.contains(MergeLogger.STR_END)) {
- fileLastPositions.remove(currFile);
- String seqFilePath = currFile.getAbsolutePath().replace(MergeTask.MERGE_SUFFIX, "");
- Iterator<TsFileResource> unmergedFileIter = unmergedFiles.iterator();
- while (unmergedFileIter.hasNext()) {
- TsFileResource seqFile = unmergedFileIter.next();
- if (seqFile.getFile().getAbsolutePath().equals(seqFilePath)) {
- mergedCnt ++;
- unmergedFileIter.remove();
- break;
- }
- }
- }
- }
- if (logger.isDebugEnabled()) {
- logger.debug("{} found {} files have already been merged after {}ms", taskName,
- mergedCnt, (System.currentTimeMillis() - startTime));
- }
- }
-
- enum Status {
- NONE, FILES_LOGGED, ALL_TS_MERGED, MERGE_END
- }
}