You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/11 08:53:08 UTC
[incubator-iotdb] branch dev_merge updated: add merge recovery
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_merge by this push:
new ad82c80 add merge recovery
ad82c80 is described below
commit ad82c803313466780fe1795fdf290bfc985b296c
Author: 江天 <jt...@163.com>
AuthorDate: Thu Jul 11 16:50:48 2019 +0800
add merge recovery
---
.../apache/iotdb/db/engine/merge/MergeLogger.java | 117 ++++++++
.../apache/iotdb/db/engine/merge/MergeTask.java | 151 ++++++++---
.../iotdb/db/engine/merge/RecoverMergeTask.java | 296 +++++++++++++++++++++
.../db/engine/storagegroup/TsFileResource.java | 6 +-
.../write/TsFileNotCompleteException.java | 40 +++
.../org/apache/iotdb/tsfile/read/common/Path.java | 2 +-
.../write/writer/ForceAppendTsFileWriter.java | 52 +---
.../write/writer/RestorableTsFileIOWriter.java | 2 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 39 ++-
9 files changed, 619 insertions(+), 86 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeLogger.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeLogger.java
new file mode 100644
index 0000000..64aeac9
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeLogger.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+public class MergeLogger {
+
+ public static final String MERGE_LOG_NAME = "merge.log";
+
+ public static final String STR_SEQ_FILES = "seqFiles";
+ public static final String STR_UNSEQ_FILES = "unseqFiles";
+ public static final String STR_START = "start";
+ public static final String STR_END = "end";
+ public static final String STR_ALL_TS_END = "all ts end";
+ public static final String STR_MERGE_START = "merge start";
+ public static final String STR_MERGE_END = "merge end";
+
+ private BufferedWriter logStream;
+
+ public MergeLogger(String storageGroupDir) throws IOException {
+ logStream = new BufferedWriter(new FileWriter(new File(storageGroupDir, MERGE_LOG_NAME), true));
+ }
+
+ public void close() throws IOException {
+ logStream.close();
+ }
+
+ public void logSeqFiles(List<TsFileResource> seqFiles) throws IOException {
+ logStream.write(STR_SEQ_FILES);
+ logStream.newLine();
+ for (TsFileResource tsFileResource : seqFiles) {
+ logStream.write(tsFileResource.getFile().getAbsolutePath());
+ }
+ logStream.flush();
+ }
+
+ public void logUnseqFiles(List<TsFileResource> unseqFiles) throws IOException {
+ logStream.write(STR_UNSEQ_FILES);
+ logStream.newLine();
+ for (TsFileResource tsFileResource : unseqFiles) {
+ logStream.write(tsFileResource.getFile().getAbsolutePath());
+ }
+ logStream.flush();
+ }
+
+ public void logMergeStart() throws IOException {
+ logStream.write(STR_MERGE_START);
+ logStream.newLine();
+ logStream.flush();
+ }
+
+ public void logTSStart(Path path) throws IOException {
+ logStream.write(path.getFullPath() + " " + STR_START);
+ logStream.newLine();
+ logStream.flush();
+ }
+
+ public void logFilePositionUpdate(File file) throws IOException {
+ logStream.write(String.format("%s %d", file.getAbsolutePath(), file.length()));
+ logStream.newLine();
+ logStream.flush();
+ }
+
+ public void logTSEnd(Path path) throws IOException {
+ logStream.write(path.getFullPath() + " " + STR_END);
+ logStream.newLine();
+ logStream.flush();
+ }
+
+ public void logAllTsEnd() throws IOException {
+ logStream.write(STR_ALL_TS_END);
+ logStream.newLine();
+ logStream.flush();
+ }
+
+ public void logFileMergeStart(File file, long position) throws IOException {
+ logStream.write(String.format("%s %d", file.getAbsolutePath(), position));
+ logStream.newLine();
+ logStream.flush();
+ }
+
+ public void logFileMergeEnd(File file) throws IOException {
+ logStream.write(file.getAbsolutePath() + " " + STR_END);
+ logStream.newLine();
+ logStream.flush();
+ }
+
+ public void logMergeEnd() throws IOException {
+ logStream.write(STR_MERGE_END);
+ logStream.newLine();
+ logStream.flush();
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
index d0df83b..4ddbc2e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.factory.SeriesReaderFactoryImpl;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -73,22 +74,24 @@ import org.slf4j.LoggerFactory;
public class MergeTask implements Callable<Void> {
private static final int MIN_CHUNK_POINT_NUM = 4096;
- private static final String MERGE_SUFFIX = ".merge";
+ public static final String MERGE_SUFFIX = ".merge";
private static final Logger logger = LoggerFactory.getLogger(MergeTask.class);
- private List<TsFileResource> seqFiles;
- private List<TsFileResource> unseqFiles;
+ protected List<TsFileResource> seqFiles;
+ protected List<TsFileResource> unseqFiles;
+ protected String storageGroupDir;
+ protected MergeLogger mergeLogger;
private TimeValuePair currTimeValuePair;
- private Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>();
- private Map<TsFileResource, RestorableTsFileIOWriter> fileWriterCache = new HashMap<>();
- private Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
- private Map<TsFileResource, MetadataQuerier> metadataQuerierCache = new HashMap<>();
+ protected Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>();
+ protected Map<TsFileResource, RestorableTsFileIOWriter> fileWriterCache = new HashMap<>();
+ protected Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
+ protected Map<TsFileResource, MetadataQuerier> metadataQuerierCache = new HashMap<>();
- private Map<TsFileResource, Integer> mergedChunkCnt = new HashMap<>();
- private Map<TsFileResource, Integer> unmergedChunkCnt = new HashMap<>();
- private Map<TsFileResource, Map<Path, List<Long>>> unmergedChunkStartTimes = new HashMap<>();
+ protected Map<TsFileResource, Integer> mergedChunkCnt = new HashMap<>();
+ protected Map<TsFileResource, Integer> unmergedChunkCnt = new HashMap<>();
+ protected Map<TsFileResource, Map<Path, List<Long>>> unmergedChunkStartTimes = new HashMap<>();
private QueryContext mergeContext = new QueryContext();
@@ -96,10 +99,16 @@ public class MergeTask implements Callable<Void> {
private long currDeviceMaxTime;
+ protected MergeTask() {
+
+ }
+
public MergeTask(List<TsFileResource> seqFiles,
- List<TsFileResource> unseqFiles) {
+ List<TsFileResource> unseqFiles, String storageGroupDir) throws IOException {
this.seqFiles = seqFiles;
this.unseqFiles = unseqFiles;
+ this.storageGroupDir = storageGroupDir;
+ this.mergeLogger = new MergeLogger(storageGroupDir);
}
@Override
@@ -109,20 +118,22 @@ public class MergeTask implements Callable<Void> {
}
private void doMerge() throws MetadataErrorException, IOException {
+
+ logFiles();
+
List<Path> unmergedSeries = collectPathsInUnseqFiles();
+ mergeSeries(unmergedSeries);
- for (TsFileResource seqFile : seqFiles) {
- unmergedChunkStartTimes.put(seqFile, new HashMap<>());
- }
+ List<TsFileResource> unmergedFiles = seqFiles;
+ mergeFiles(unmergedFiles);
- // merge each series and write data into each seqFile's temp merge file
- for (Path path : unmergedSeries) {
- mergeOnePath(path);
- }
+ cleanUp(true);
+ }
+ protected void mergeFiles(List<TsFileResource> unmergedFiles) throws IOException {
// decide whether to write the unmerged chunks to the merge files or to move the merged data
// back to the origin seqFile's
- for (TsFileResource seqFile : seqFiles) {
+ for (TsFileResource seqFile : unmergedFiles) {
int mergedChunkNum = mergedChunkCnt.getOrDefault(seqFile, 0);
int unmergedChunkNum = unmergedChunkCnt.getOrDefault(seqFile, 0);
if (mergedChunkNum >= unmergedChunkNum) {
@@ -133,11 +144,30 @@ public class MergeTask implements Callable<Void> {
moveMergedToOld(seqFile);
}
}
+ mergeLogger.logMergeEnd();
+ }
+
+ protected void logFiles() throws IOException {
+ mergeLogger.logSeqFiles(seqFiles);
+ mergeLogger.logUnseqFiles(unseqFiles);
+ mergeLogger.logMergeStart();
+ }
- cleanUp();
+ protected void mergeSeries(List<Path> unmergedSeries) throws IOException {
+ for (TsFileResource seqFile : seqFiles) {
+ unmergedChunkStartTimes.put(seqFile, new HashMap<>());
+ }
+ // merge each series and write data into each seqFile's temp merge file
+ for (Path path : unmergedSeries) {
+ mergeLogger.logTSStart(path);
+ mergeOnePath(path);
+ mergeLogger.logTSEnd(path);
+ }
+ mergeLogger.logAllTsEnd();
}
- private void cleanUp() throws IOException {
+
+ protected void cleanUp(boolean executeCallback) throws IOException {
for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) {
sequenceReader.close();
}
@@ -149,12 +179,38 @@ public class MergeTask implements Callable<Void> {
mergedChunkCnt.clear();
unmergedChunkCnt.clear();
unmergedChunkStartTimes.clear();
+
+ mergeLogger.close();
+ File logFile = new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME);
+ logFile.delete();
+ for (TsFileResource seqFile : seqFiles) {
+ File mergeFile = new File(seqFile.getFile().getPath() + MERGE_SUFFIX);
+ mergeFile.delete();
+ }
}
private void moveMergedToOld(TsFileResource seqFile) throws IOException {
+ int mergedChunkNum = mergedChunkCnt.getOrDefault(seqFile, 0);
+ if (mergedChunkNum == 0) {
+ RestorableTsFileIOWriter newFileWriter = getMergeFileWriter(seqFile);
+ newFileWriter.close();
+ newFileWriter.getFile().delete();
+ fileWriterCache.remove(seqFile);
+ return;
+ }
+
seqFile.getMergeQueryLock().writeLock().lock();
try {
- ForceAppendTsFileWriter oldFileWriter = new ForceAppendTsFileWriter(seqFile.getFile());
+ TsFileIOWriter oldFileWriter;
+ try {
+ oldFileWriter = new ForceAppendTsFileWriter(seqFile.getFile());
+ mergeLogger.logFileMergeStart(seqFile.getFile(), ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
+ ((ForceAppendTsFileWriter) oldFileWriter).doTruncate();
+ } catch (TsFileNotCompleteException e) {
+ // this file may already be truncated if this merge is a system reboot merge
+ oldFileWriter = new RestorableTsFileIOWriter(seqFile.getFile());
+ }
+
oldFileWriter.filterChunks(unmergedChunkStartTimes.get(seqFile));
RestorableTsFileIOWriter newFileWriter = getMergeFileWriter(seqFile);
@@ -168,6 +224,10 @@ public class MergeTask implements Callable<Void> {
}
}
oldFileWriter.endFile(new FileSchema(oldFileWriter.getKnownSchema()));
+
+ seqFile.serialize();
+ mergeLogger.logFileMergeEnd(seqFile.getFile());
+
newFileWriter.getFile().delete();
} finally {
seqFile.getMergeQueryLock().writeLock().unlock();
@@ -192,31 +252,39 @@ public class MergeTask implements Callable<Void> {
fileWriter.endChunkGroup(maxVersion);
}
-
-
private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
Map<Path, List<Long>> fileUnmergedChunkStartTimes = this.unmergedChunkStartTimes.get(seqFile);
RestorableTsFileIOWriter fileWriter = getMergeFileWriter(seqFile);
ChunkLoader chunkLoader = new ChunkLoaderImpl(getFileReader(seqFile));
- for (Entry<Path, List<Long>> entry : fileUnmergedChunkStartTimes.entrySet()) {
- Path path = entry.getKey();
- List<Long> chunkStartTimes = entry.getValue();
- if (chunkStartTimes.isEmpty()) {
- continue;
- }
+ mergeLogger.logFileMergeStart(fileWriter.getFile(), fileWriter.getFile().length());
- List<ChunkMetaData> chunkMetaDataList = queryChunkMetadata(path, seqFile);
- MeasurementSchema measurementSchema = getSchema(path);
- IChunkWriter chunkWriter = getChunkWriter(measurementSchema);
+ int unmergedChunkNum = unmergedChunkCnt.getOrDefault(seqFile, 0);
- fileWriter.startChunkGroup(path.getDevice());
- long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetaDataList, chunkLoader,
- chunkWriter, measurementSchema, fileWriter);
- fileWriter.endChunkGroup(maxVersion);
+ if (unmergedChunkNum > 0) {
+ for (Entry<Path, List<Long>> entry : fileUnmergedChunkStartTimes.entrySet()) {
+ Path path = entry.getKey();
+ List<Long> chunkStartTimes = entry.getValue();
+ if (chunkStartTimes.isEmpty()) {
+ continue;
+ }
+
+ List<ChunkMetaData> chunkMetaDataList = queryChunkMetadata(path, seqFile);
+ MeasurementSchema measurementSchema = getSchema(path);
+ IChunkWriter chunkWriter = getChunkWriter(measurementSchema);
+
+ fileWriter.startChunkGroup(path.getDevice());
+ long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetaDataList, chunkLoader,
+ chunkWriter, measurementSchema, fileWriter);
+ fileWriter.endChunkGroup(maxVersion);
+ }
}
+
fileWriter.endFile(new FileSchema(fileWriter.getKnownSchema()));
+ seqFile.serialize();
+ mergeLogger.logFileMergeEnd(fileWriter.getFile());
+
seqFile.getMergeQueryLock().writeLock().lock();
try {
FileUtils.moveFile(fileWriter.getFile(), seqFile.getFile());
@@ -298,6 +366,7 @@ public class MergeTask implements Callable<Void> {
if (mergeChunks(seqChunkMeta, fileLimitTime, chunkLoader, measurementSchema,
unseqReader, mergeFileWriter, currTsFile, path)) {
mergeFileWriter.endChunkGroup(seqChunkMeta.get(seqChunkMeta.size() - 1).getVersion());
+ mergeLogger.logFilePositionUpdate(mergeFileWriter.getFile());
}
currTsFile.updateTime(path.getDevice(), currDeviceMaxTime);
}
@@ -455,7 +524,7 @@ public class MergeTask implements Callable<Void> {
return pathModifications;
}
- private List<Path> collectPathsInUnseqFiles() throws MetadataErrorException {
+ protected List<Path> collectPathsInUnseqFiles() throws MetadataErrorException {
Set<String> devices = new HashSet<>();
for (TsFileResource tsFileResource : unseqFiles) {
devices.addAll(tsFileResource.getEndTimeMap().keySet());
@@ -481,7 +550,7 @@ public class MergeTask implements Callable<Void> {
return null;
}
- private RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException {
+ protected RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException {
RestorableTsFileIOWriter writer = fileWriterCache.get(resource);
if (writer == null) {
writer = new RestorableTsFileIOWriter(new File(resource.getFile().getPath() + MERGE_SUFFIX));
@@ -490,7 +559,7 @@ public class MergeTask implements Callable<Void> {
return writer;
}
- private MetadataQuerier getMetadataQuerier(TsFileResource seqFile) throws IOException {
+ protected MetadataQuerier getMetadataQuerier(TsFileResource seqFile) throws IOException {
MetadataQuerier metadataQuerier = metadataQuerierCache.get(seqFile);
if (metadataQuerier == null) {
metadataQuerier = new MetadataQuerierByFileImpl(getFileReader(seqFile));
@@ -499,7 +568,7 @@ public class MergeTask implements Callable<Void> {
return metadataQuerier;
}
- private List<ChunkMetaData> queryChunkMetadata(Path path, TsFileResource seqFile)
+ protected List<ChunkMetaData> queryChunkMetadata(Path path, TsFileResource seqFile)
throws IOException {
MetadataQuerier metadataQuerier = getMetadataQuerier(seqFile);
List<ChunkMetaData> chunkMetaDataList = metadataQuerier.getChunkMetaDataList(path);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
new file mode 100644
index 0000000..9234443
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.MetadataErrorException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+
+public class RecoverMergeTask extends MergeTask {
+
+ private String currLine;
+ private List<TsFileResource> mergeSeqFiles = new ArrayList<>();
+ private List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
+
+ private Map<File, Long> fileLastPositions = new HashMap<>();
+ private Map<File, Long> tempFileLastPositions = new HashMap<>();
+
+ private List<Path> unmergedPaths;
+ private List<Path> mergedPaths = new ArrayList<>();
+ private List<TsFileResource> unmergedFiles;
+
+ public RecoverMergeTask(
+ List<TsFileResource> allSeqFiles,
+ List<TsFileResource> allUnseqFiles,
+ String storageGroupDir) throws IOException {
+ super();
+ this.seqFiles = allSeqFiles;
+ this.unseqFiles = allUnseqFiles;
+ this.storageGroupDir = storageGroupDir;
+ }
+
+ public void recoverMerge(boolean continueMerge) throws IOException, MetadataErrorException {
+ File logFile = new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME);
+ if (!logFile.exists()) {
+ return;
+ }
+ mergeSeqFiles = new ArrayList<>();
+ mergeUnseqFiles = new ArrayList<>();
+
+ Status status = determineStatus(logFile);
+ switch (status) {
+ case NONE:
+ logFile.delete();
+ break;
+ case FILES_LOGGED:
+ if (continueMerge) {
+ mergeLogger = new MergeLogger(storageGroupDir);
+ truncateFiles();
+ recoverChunkCounts();
+ mergeSeries(unmergedPaths);
+
+ mergeFiles(seqFiles);
+ }
+ cleanUp(continueMerge);
+ break;
+ case ALL_TS_MERGED:
+ if (continueMerge) {
+ mergeLogger = new MergeLogger(storageGroupDir);
+ truncateFiles();
+ recoverChunkCounts();
+ mergeFiles(unmergedFiles);
+ } else {
+ // NOTICE: although some of the seqFiles may have been truncated, later TsFile recovery
+ // will recover them, so they are not a concern here
+ truncateFiles();
+ }
+ cleanUp(continueMerge);
+ break;
+ case MERGE_END:
+ cleanUp(continueMerge);
+ break;
+ }
+ }
+
+
+ // scan metadata to compute how many chunks are merged/unmerged so at last we can decide to
+ // move the merged chunks or the unmerged chunks
+ private void recoverChunkCounts() throws IOException {
+ for (TsFileResource tsFileResource : seqFiles) {
+ RestorableTsFileIOWriter mergeFileWriter = getMergeFileWriter(tsFileResource);
+ mergeFileWriter.makeMetadataVisible();
+ unmergedChunkStartTimes.put(tsFileResource, new HashMap<>());
+ for(Path path : mergedPaths) {
+ recoverChunkCounts(path, tsFileResource, mergeFileWriter);
+ }
+ }
+ }
+
+ private void recoverChunkCounts(Path path, TsFileResource tsFileResource,
+ RestorableTsFileIOWriter mergeFileWriter) throws IOException {
+ unmergedChunkStartTimes.get(tsFileResource).put(path, new ArrayList<>());
+
+ List<ChunkMetaData> seqFileChunks = queryChunkMetadata(path, tsFileResource);
+ List<ChunkMetaData> mergeFileChunks =
+ mergeFileWriter.getVisibleMetadatas(path.getDevice(), path.getMeasurement(), null);
+ mergedChunkCnt.compute(tsFileResource, (k, v) -> v == null ? mergeFileChunks.size() :
+ v + mergeFileChunks.size());
+ int seqIndex = 0;
+ int mergeIndex = 0;
+ int unmergedCnt = 0;
+ while (seqIndex < seqFileChunks.size() && mergeIndex < mergeFileChunks.size()) {
+ ChunkMetaData seqChunk = seqFileChunks.get(seqIndex);
+ ChunkMetaData mergedChunk = mergeFileChunks.get(mergeIndex);
+ if (seqChunk.getStartTime() < mergedChunk.getStartTime()) {
+ // this seqChunk is unmerged
+ unmergedCnt ++;
+ seqIndex ++;
+ unmergedChunkStartTimes.get(tsFileResource).get(path).add(seqChunk.getStartTime());
+ } else if (mergedChunk.getStartTime() <= seqChunk.getStartTime() &&
+ seqChunk.getStartTime() <= mergedChunk.getEndTime()) {
+ // this seqChunk is merged
+ seqIndex ++;
+ } else {
+ // seqChunk.startTime > mergeChunk.endTime, find next mergedChunk that may cover the
+ // seqChunk
+ mergeIndex ++;
+ }
+ }
+ int finalUnmergedCnt = unmergedCnt;
+ unmergedChunkCnt.compute(tsFileResource, (k, v) -> v == null ? finalUnmergedCnt :
+ v + finalUnmergedCnt);
+ }
+
+ private void truncateFiles() throws IOException {
+ for (Entry<File, Long> entry : fileLastPositions.entrySet()) {
+ File file = entry.getKey();
+ Long lastPosition = entry.getValue();
+ if (file.exists() && file.length() != lastPosition) {
+ try (FileInputStream fileInputStream = new FileInputStream(file)) {
+ FileChannel channel = fileInputStream.getChannel();
+ channel.truncate(lastPosition);
+ channel.close();
+ }
+ }
+ }
+ }
+
+ private Status determineStatus(File logFile) throws IOException, MetadataErrorException {
+ Status status = Status.NONE;
+ try (BufferedReader bufferedReader =
+ new BufferedReader(new FileReader(logFile))) {
+ currLine = bufferedReader.readLine();
+ if (currLine != null) {
+ if (currLine.equals(MergeLogger.STR_SEQ_FILES)) {
+ analyzeSeqFiles(bufferedReader);
+ }
+ if (currLine.equals(MergeLogger.STR_UNSEQ_FILES)) {
+ analyzeUnseqFiles(bufferedReader);
+ }
+ if (currLine.equals(MergeLogger.STR_MERGE_START)) {
+ status = Status.FILES_LOGGED;
+ seqFiles = mergeSeqFiles;
+ unseqFiles = mergeUnseqFiles;
+ for (TsFileResource seqFile : seqFiles) {
+ File mergeFile = new File(seqFile.getFile().getPath() + MergeTask.MERGE_SUFFIX);
+ fileLastPositions.put(mergeFile, 0L);
+ }
+ unmergedPaths = collectPathsInUnseqFiles();
+ analyzeMergedSeries(bufferedReader, unmergedPaths);
+ }
+ if (currLine.equals(MergeLogger.STR_ALL_TS_END)) {
+ status = Status.ALL_TS_MERGED;
+ unmergedFiles = seqFiles;
+ analyzeMergedFiles(bufferedReader);
+ }
+ if (currLine.equals(MergeLogger.STR_MERGE_END)) {
+ status = Status.MERGE_END;
+ }
+ }
+ }
+ return status;
+ }
+
+ private void analyzeSeqFiles(BufferedReader bufferedReader) throws IOException {
+ while ((currLine = bufferedReader.readLine()) != null) {
+ if (currLine.equals(MergeLogger.STR_UNSEQ_FILES)) {
+ return;
+ }
+ Iterator<TsFileResource> iterator = seqFiles.iterator();
+ while (iterator.hasNext()) {
+ TsFileResource seqFile = iterator.next();
+ if (seqFile.getFile().getAbsolutePath().equals(currLine)) {
+ mergeSeqFiles.add(seqFile);
+ iterator.remove();
+ break;
+ }
+ }
+ }
+ }
+
+ private void analyzeUnseqFiles(BufferedReader bufferedReader) throws IOException {
+ while ((currLine = bufferedReader.readLine()) != null) {
+ if (currLine.equals(MergeLogger.STR_MERGE_START)) {
+ return;
+ }
+ Iterator<TsFileResource> iterator = unseqFiles.iterator();
+ while (iterator.hasNext()) {
+ TsFileResource unseqFile = iterator.next();
+ if (unseqFile.getFile().getAbsolutePath().equals(currLine)) {
+ mergeUnseqFiles.add(unseqFile);
+ iterator.remove();
+ break;
+ }
+ }
+ }
+ }
+
+ private void analyzeMergedSeries(BufferedReader bufferedReader, List<Path> unmergedPaths) throws IOException {
+ Path currTS = null;
+ while ((currLine = bufferedReader.readLine()) != null) {
+ if (currLine.equals(MergeLogger.STR_ALL_TS_END)) {
+ return;
+ }
+ if (currLine.contains(MergeLogger.STR_START)) {
+ // a TS starts to merge
+ String[] splits = currLine.split(" ");
+ currTS = new Path(splits[0]);
+ tempFileLastPositions.clear();
+ } else if (!currLine.contains(MergeLogger.STR_END)) {
+ // file position
+ String[] splits = currLine.split(" ");
+ File file = new File(splits[0]);
+ Long position = Long.parseLong(splits[1]);
+ tempFileLastPositions.put(file, position);
+ } else {
+ // a TS ends merging
+ unmergedPaths.remove(currTS);
+ for (Entry<File, Long> entry : tempFileLastPositions.entrySet()) {
+ fileLastPositions.put(entry.getKey(), entry.getValue());
+ }
+ mergedPaths.add(currTS);
+ }
+ }
+ }
+
+ private void analyzeMergedFiles(BufferedReader bufferedReader) throws IOException {
+ File currFile = null;
+ while ((currLine = bufferedReader.readLine()) != null) {
+ if (currLine.equals(MergeLogger.STR_MERGE_END)) {
+ return;
+ }
+ if (currLine.contains(MergeLogger.STR_START)) {
+ String[] splits = currLine.split(" ");
+ currFile = new File(splits[0]);
+ Long lastPost = Long.parseLong(splits[2]);
+ fileLastPositions.put(currFile, lastPost);
+ } else if (currLine.contains(MergeLogger.STR_END)) {
+ fileLastPositions.remove(currFile);
+ String seqFilePath = currFile.getAbsolutePath().replace(MergeTask.MERGE_SUFFIX, "");
+ Iterator<TsFileResource> unmergedFileIter = unmergedFiles.iterator();
+ while (unmergedFileIter.hasNext()) {
+ TsFileResource seqFile = unmergedFileIter.next();
+ if (seqFile.getFile().getAbsolutePath().equals(seqFilePath)) {
+ unmergedFileIter.remove();
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ enum Status {
+ NONE, FILES_LOGGED, ALL_TS_MERGED, MERGE_END
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 6744473..df979c8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -42,6 +43,7 @@ public class TsFileResource {
private File file;
public static final String RESOURCE_SUFFIX = ".resource";
+ public static final String TEMP_SUFFIX = ".temp";
/**
* device -> start time
@@ -109,7 +111,7 @@ public class TsFileResource {
public void serialize() throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(
- new FileOutputStream(file + RESOURCE_SUFFIX))) {
+ new FileOutputStream(file + RESOURCE_SUFFIX + TEMP_SUFFIX))) {
ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream);
for (Entry<String, Long> entry : this.startTimeMap.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
@@ -121,6 +123,8 @@ public class TsFileResource {
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
}
+ FileUtils.moveFile(new File(file + RESOURCE_SUFFIX + TEMP_SUFFIX),
+ new File(file + RESOURCE_SUFFIX));
}
public void deSerialize() throws IOException {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/write/TsFileNotCompleteException.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/write/TsFileNotCompleteException.java
new file mode 100644
index 0000000..aa38fd9
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/write/TsFileNotCompleteException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.exception.write;
+
+import java.io.IOException;
+
+public class TsFileNotCompleteException extends IOException {
+
+ public TsFileNotCompleteException() {
+ }
+
+ public TsFileNotCompleteException(String message) {
+ super(message);
+ }
+
+ public TsFileNotCompleteException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TsFileNotCompleteException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
index 3d448d7..d552cd4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
@@ -189,7 +189,7 @@ public class Path implements Serializable {
@Override
public boolean equals(Object obj) {
- return obj != null && obj instanceof Path && this.fullPath.equals(((Path) obj).fullPath);
+ return obj instanceof Path && this.fullPath.equals(((Path) obj).fullPath);
}
public boolean equals(String obj) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
index 736b080..b5b0382 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
@@ -21,17 +21,13 @@ package org.apache.iotdb.tsfile.write.writer;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
/**
@@ -41,6 +37,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
public class ForceAppendTsFileWriter extends TsFileIOWriter{
private Map<String, MeasurementSchema> knownSchemas = new HashMap<>();
+ private long truncatePosition;
public ForceAppendTsFileWriter(File file) throws IOException {
this.out = new DefaultTsFileOutput(file, true);
@@ -48,14 +45,14 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter{
// file doesn't exist
if (file.length() == 0 || !file.exists()) {
- throw new IOException("File " + file.getPath() + " is not a complete TsFile");
+ throw new TsFileNotCompleteException("File " + file.getPath() + " is not a complete TsFile");
}
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), true)) {
// this tsfile is not complete
if (!reader.isComplete()) {
- throw new IOException("File " + file.getPath() + " is not a complete TsFile");
+ throw new TsFileNotCompleteException("File " + file.getPath() + " is not a complete TsFile");
}
TsFileMetaData fileMetaData = reader.readFileMetadata();
Map<String, TsDeviceMetadataIndex> deviceMap = fileMetaData.getDeviceMap();
@@ -69,45 +66,18 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter{
deviceMetadataIndex.getOffset() : firstDeviceMetaPos;
}
// truncate metadata and marker
- out.truncate(firstDeviceMetaPos - 1);
+ truncatePosition = firstDeviceMetaPos - 1;
knownSchemas = fileMetaData.getMeasurementSchema();
+
}
}
- /**
- * Remove such ChunkMetadata that its startTime is not in chunkStartTimes
- * @param chunkStartTimes
- */
- public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
- Map<Path, Integer> startTimeIdxes = new HashMap<>();
- chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));
+ public void doTruncate() throws IOException {
+ out.truncate(truncatePosition);
+ }
- Iterator<ChunkGroupMetaData> chunkGroupMetaDataIterator = chunkGroupMetaDataList.iterator();
- while (chunkGroupMetaDataIterator.hasNext()) {
- ChunkGroupMetaData chunkGroupMetaData = chunkGroupMetaDataIterator.next();
- String deviceId = chunkGroupMetaData.getDeviceID();
- int chunkNum = chunkGroupMetaData.getChunkMetaDataList().size();
- Iterator<ChunkMetaData> chunkMetaDataIterator =
- chunkGroupMetaData.getChunkMetaDataList().iterator();
- while (chunkMetaDataIterator.hasNext()) {
- ChunkMetaData chunkMetaData = chunkMetaDataIterator.next();
- Path path = new Path(deviceId, chunkMetaData.getMeasurementUid());
- int startTimeIdx = startTimeIdxes.get(path);
- List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
- boolean chunkValid = startTimeIdx < pathChunkStartTimes.size()
- && pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
- if (!chunkValid) {
- chunkGroupMetaDataIterator.remove();
- chunkNum--;
- invalidChunkNum++;
- } else {
- startTimeIdxes.put(path, startTimeIdx + 1);
- }
- }
- if (chunkNum == 0) {
- chunkGroupMetaDataIterator.remove();
- }
- }
+ public long getTruncatePosition() {
+ return truncatePosition;
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index e7802a6..2c2399a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -128,7 +128,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
// filter: if a device'sensor is defined as float type, and data has been persistent.
// Then someone deletes the timeseries and recreate it with Int type. We have to ignore
// all the stale data.
- if (dataType.equals(chunkMetaData.getTsDataType())) {
+ if (dataType == null || dataType.equals(chunkMetaData.getTsDataType())) {
chunkMetaDatas.add(chunkMetaData);
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 2828b6f..f4750a3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -42,6 +43,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -356,7 +358,6 @@ public class TsFileIOWriter {
/**
* close the outputStream or file channel without writing FileMetadata.
- * This is just used for Testing.
*/
public void close() throws IOException {
canWrite = false;
@@ -389,4 +390,40 @@ public class TsFileIOWriter {
public File getFile() {
return file;
}
+
+ /**
+ * Remove such ChunkMetadata that its startTime is not in chunkStartTimes
+ * @param chunkStartTimes
+ */
+ public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
+ Map<Path, Integer> startTimeIdxes = new HashMap<>();
+ chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));
+
+ Iterator<ChunkGroupMetaData> chunkGroupMetaDataIterator = chunkGroupMetaDataList.iterator();
+ while (chunkGroupMetaDataIterator.hasNext()) {
+ ChunkGroupMetaData chunkGroupMetaData = chunkGroupMetaDataIterator.next();
+ String deviceId = chunkGroupMetaData.getDeviceID();
+ int chunkNum = chunkGroupMetaData.getChunkMetaDataList().size();
+ Iterator<ChunkMetaData> chunkMetaDataIterator =
+ chunkGroupMetaData.getChunkMetaDataList().iterator();
+ while (chunkMetaDataIterator.hasNext()) {
+ ChunkMetaData chunkMetaData = chunkMetaDataIterator.next();
+ Path path = new Path(deviceId, chunkMetaData.getMeasurementUid());
+ int startTimeIdx = startTimeIdxes.get(path);
+ List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
+ boolean chunkValid = startTimeIdx < pathChunkStartTimes.size()
+ && pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
+ if (!chunkValid) {
+ chunkGroupMetaDataIterator.remove();
+ chunkNum--;
+ invalidChunkNum++;
+ } else {
+ startTimeIdxes.put(path, startTimeIdx + 1);
+ }
+ }
+ if (chunkNum == 0) {
+ chunkGroupMetaDataIterator.remove();
+ }
+ }
+ }
}