You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/23 09:25:09 UTC
[incubator-iotdb] branch rel/0.10 updated: Improve the recover
process (#1545)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.10
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/rel/0.10 by this push:
new a25db18 Improve the recover process (#1545)
a25db18 is described below
commit a25db189f1d2b21efca4375cb21b255284979fbb
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Thu Jul 23 17:25:01 2020 +0800
Improve the recover process (#1545)
* refactor recover process
---
.../apache/iotdb/hadoop/fileSystem/HDFSOutput.java | 4 +-
.../engine/storagegroup/StorageGroupProcessor.java | 239 ++++++++++-----------
.../writelog/recover/TsFileRecoverPerformer.java | 26 +--
.../iotdb/tsfile/read/TsFileCheckStatus.java | 1 -
.../iotdb/tsfile/read/TsFileSequenceReader.java | 79 +++----
.../tsfile/write/writer/LocalTsFileOutput.java | 4 +-
.../write/writer/RestorableTsFileIOWriter.java | 42 ++--
.../iotdb/tsfile/write/writer/TsFileOutput.java | 4 +-
.../write/writer/RestorableTsFileIOWriterTest.java | 17 +-
9 files changed, 203 insertions(+), 213 deletions(-)
diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
index 8693eb2..aec1a58 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
@@ -86,11 +86,11 @@ public class HDFSOutput implements TsFileOutput {
}
@Override
- public void truncate(long position) throws IOException {
+ public void truncate(long size) throws IOException {
if (fs.exists(path)) {
fsDataOutputStream.close();
}
- fs.truncate(path, position);
+ fs.truncate(path, size);
if (fs.exists(path)) {
fsDataOutputStream = fs.append(path);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f1818ec..1f66ae2 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -72,7 +72,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.metadata.mnode.InternalMNode;
import org.apache.iotdb.db.metadata.mnode.LeafMNode;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -228,22 +227,20 @@ public class StorageGroupProcessor {
private TsFileFlushPolicy fileFlushPolicy;
/**
- * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close,
- * not including the files generated by merge) of each partition.
- * As data file close is managed by the leader in the distributed version, the files with the
- * same version(s) have the same data, despite that the inner structure (the size and
- * organization of chunks) may be different, so we can easily find what remote files we do not
- * have locally.
- * partition number -> version number set
+ * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, not
+ * including the files generated by merge) of each partition. As data file close is managed by the
+ * leader in the distributed version, the files with the same version(s) have the same data,
+ * despite that the inner structure (the size and organization of chunks) may be different, so we
+ * can easily find what remote files we do not have locally. partition number -> version number
+ * set
*/
private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
/**
- * The max file versions in each partition. By recording this, if several IoTDB instances have
- * the same policy of closing file and their ingestion is identical, then files of the same
- * version in different IoTDB instance will have identical data, providing convenience for data
- * comparison across different instances.
- * partition number -> max version number
+ * The max file versions in each partition. By recording this, if several IoTDB instances have the
+ * same policy of closing file and their ingestion is identical, then files of the same version in
+ * different IoTDB instance will have identical data, providing convenience for data comparison
+ * across different instances. partition number -> max version number
*/
private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
@@ -271,27 +268,29 @@ public class StorageGroupProcessor {
try {
// collect candidate TsFiles from sequential and unsequential data directory
Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair = getAllFiles(
- DirectoryManager.getInstance().getAllSequenceFileFolders());
+ DirectoryManager.getInstance().getAllSequenceFileFolders());
List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left;
List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right;
upgradeSeqFileList.addAll(oldSeqTsFiles);
Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair = getAllFiles(
- DirectoryManager.getInstance().getAllUnSequenceFileFolders());
+ DirectoryManager.getInstance().getAllUnSequenceFileFolders());
List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left;
List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
upgradeUnseqFileList.addAll(oldUnseqTsFiles);
- recoverSeqFiles(tmpSeqTsFiles);
- recoverUnseqFiles(tmpUnseqTsFiles);
+ recoverTsFiles(tmpSeqTsFiles, true, workSequenceTsFileProcessors, sequenceFileTreeSet);
+ recoverTsFiles(tmpUnseqTsFiles, false, workUnsequenceTsFileProcessors, unSequenceFileList);
for (TsFileResource resource : sequenceFileTreeSet) {
long partitionNum = resource.getTimePartition();
- partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+ .addAll(resource.getHistoricalVersions());
updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
}
for (TsFileResource resource : unSequenceFileList) {
long partitionNum = resource.getTimePartition();
- partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+ .addAll(resource.getHistoricalVersions());
updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
}
@@ -316,7 +315,6 @@ public class StorageGroupProcessor {
throw new StorageGroupProcessorException(e);
}
-
for (TsFileResource resource : sequenceFileTreeSet) {
long timePartitionId = resource.getTimePartition();
Map<String, Long> endTimeMap = new HashMap<>();
@@ -345,11 +343,11 @@ public class StorageGroupProcessor {
/**
* use old seq file to update latestTimeForEachDevice, globalLatestFlushedTimeForEachDevice,
* partitionLatestFlushedTimeForEachDevice and timePartitionIdVersionControllerMap
- *
*/
private void updateLastestFlushedTime() throws IOException {
- VersionController versionController = new SimpleFileVersionController(storageGroupSysDir.getPath());
+ VersionController versionController = new SimpleFileVersionController(
+ storageGroupSysDir.getPath());
long currentVersion = versionController.currVersion();
for (TsFileResource resource : upgradeSeqFileList) {
for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) {
@@ -358,24 +356,27 @@ public class StorageGroupProcessor {
long endTime = resource.getEndTime(index);
long endTimePartitionId = StorageEngine.getTimePartition(endTime);
latestTimeForEachDevice.computeIfAbsent(endTimePartitionId, l -> new HashMap<>())
- .put(deviceId, endTime);
+ .put(deviceId, endTime);
globalLatestFlushedTimeForEachDevice.put(deviceId, endTime);
// set all the covered partition's LatestFlushedTime to Long.MAX_VALUE
long partitionId = StorageEngine.getTimePartition(resource.getStartTime(index));
while (partitionId <= endTimePartitionId) {
partitionLatestFlushedTimeForEachDevice.computeIfAbsent(partitionId, l -> new HashMap<>())
- .put(deviceId, Long.MAX_VALUE);
+ .put(deviceId, Long.MAX_VALUE);
if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) {
- File directory = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, String.valueOf(partitionId));
- if(!directory.exists()){
+ File directory = SystemFileFactory.INSTANCE
+ .getFile(storageGroupSysDir, String.valueOf(partitionId));
+ if (!directory.exists()) {
directory.mkdirs();
}
- File versionFile = SystemFileFactory.INSTANCE.getFile(directory, SimpleFileVersionController.FILE_PREFIX + currentVersion);
+ File versionFile = SystemFileFactory.INSTANCE
+ .getFile(directory, SimpleFileVersionController.FILE_PREFIX + currentVersion);
if (!versionFile.createNewFile()) {
logger.warn("Version file {} has already been created ", versionFile);
}
- timePartitionIdVersionControllerMap.put(partitionId, new SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId));
+ timePartitionIdVersionControllerMap.put(partitionId,
+ new SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId));
}
partitionId++;
}
@@ -401,7 +402,8 @@ public class StorageGroupProcessor {
});
}
- private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders) throws IOException {
+ private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders)
+ throws IOException {
List<File> tsFiles = new ArrayList<>();
List<File> upgradeFiles = new ArrayList<>();
for (String baseDir : folders) {
@@ -419,9 +421,12 @@ public class StorageGroupProcessor {
// the process was interrupted before the merged files could be named
continueFailedRenames(fileFolder, MERGE_SUFFIX);
- File[] oldTsfileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
- File[] oldResourceFileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX);
- File[] oldModificationFileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX);
+ File[] oldTsfileArray = fsFactory
+ .listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
+ File[] oldResourceFileArray = fsFactory
+ .listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX);
+ File[] oldModificationFileArray = fsFactory
+ .listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX);
File upgradeFolder = fsFactory.getFile(fileFolder, IoTDBConstant.UPGRADE_FOLDER_NAME);
// move the old files to upgrade folder if exists
if (oldTsfileArray.length != 0 || oldResourceFileArray.length != 0) {
@@ -511,13 +516,14 @@ public class StorageGroupProcessor {
}
}
- private void recoverSeqFiles(List<TsFileResource> tsFiles) {
+ private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq,
+ TreeMap<Long, TsFileProcessor> treeMap, Collection<TsFileResource> tsFileResources) {
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
long timePartitionId = tsFileResource.getTimePartition();
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
- getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, false,
+ getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, !isSeq,
i == tsFiles.size() - 1);
RestorableTsFileIOWriter writer;
@@ -532,52 +538,26 @@ public class StorageGroupProcessor {
tsFileResource.setClosed(true);
} else if (writer.canWrite()) {
// the last file is not closed, continue writing to it
- TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
- getVersionControllerByTimePartitionId(timePartitionId),
- this::closeUnsealedTsFileProcessorCallBack,
- this::updateLatestFlushTimeCallback, true, writer);
- workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
- tsFileResource.setProcessor(tsFileProcessor);
- tsFileResource.removeResourceFile();
- tsFileProcessor.setTimeRangeId(timePartitionId);
- writer.makeMetadataVisible();
- }
- sequenceFileTreeSet.add(tsFileResource);
- }
- }
-
- private void recoverUnseqFiles(List<TsFileResource> tsFiles) {
- for (int i = 0; i < tsFiles.size(); i++) {
- TsFileResource tsFileResource = tsFiles.get(i);
- long timePartitionId = tsFileResource.getTimePartition();
+ TsFileProcessor tsFileProcessor;
+ if (isSeq) {
+ tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
+ getVersionControllerByTimePartitionId(timePartitionId),
+ this::closeUnsealedTsFileProcessorCallBack,
+ this::updateLatestFlushTimeCallback, true, writer);
+ } else {
+ tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
+ getVersionControllerByTimePartitionId(timePartitionId),
+ this::closeUnsealedTsFileProcessorCallBack,
+ this::unsequenceFlushCallback, false, writer);
+ }
- TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
- getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true,
- i == tsFiles.size() - 1);
- RestorableTsFileIOWriter writer;
- try {
- writer = recoverPerformer.recover();
- } catch (StorageGroupProcessorException e) {
- logger.warn("Skip TsFile: {} because of error in recover: ", tsFileResource.getPath(), e);
- continue;
- }
- if (i != tsFiles.size() - 1 || !writer.canWrite()) {
- // not the last file or cannot write, just close it
- tsFileResource.setClosed(true);
- } else if (writer.canWrite()) {
- // the last file is not closed, continue writing to it
- TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
- getVersionControllerByTimePartitionId(timePartitionId),
- this::closeUnsealedTsFileProcessorCallBack,
- this::unsequenceFlushCallback, false, writer);
- workUnsequenceTsFileProcessors
- .put(timePartitionId, tsFileProcessor);
+ treeMap.put(timePartitionId, tsFileProcessor);
tsFileResource.setProcessor(tsFileProcessor);
tsFileResource.removeResourceFile();
tsFileProcessor.setTimeRangeId(timePartitionId);
writer.makeMetadataVisible();
}
- unSequenceFileList.add(tsFileResource);
+ tsFileResources.add(tsFileResource);
}
}
@@ -609,7 +589,8 @@ public class StorageGroupProcessor {
long timePartitionId = StorageEngine.getTimePartition(insertPlan.getTime());
latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
- partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>());
// insert to sequence or unSequence file
insertToTsFileProcessor(insertPlan,
@@ -648,7 +629,8 @@ public class StorageGroupProcessor {
// before is first start point
int before = loc;
// before time partition
- long beforeTimePartition = StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]);
+ long beforeTimePartition = StorageEngine
+ .getTimePartition(insertTabletPlan.getTimes()[before]);
// init map
long lastFlushTime = partitionLatestFlushedTimeForEachDevice.
computeIfAbsent(beforeTimePartition, id -> new HashMap<>()).
@@ -709,15 +691,15 @@ public class StorageGroupProcessor {
}
/**
- * insert batch to tsfile processor thread-safety that the caller need to guarantee
- * The rows to be inserted are in the range [start, end)
+ * insert batch to tsfile processor thread-safety that the caller need to guarantee The rows to be
+ * inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
*/
private void insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
int start, int end, boolean sequence, TSStatus[] results, long timePartitionId)
@@ -845,10 +827,10 @@ public class StorageGroupProcessor {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param fileList file list to add new processor
- * @param sequence whether is sequence or not
+ * @param fileList file list to add new processor
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1228,7 +1210,7 @@ public class StorageGroupProcessor {
.query(deviceId, measurementId, schema.getType(), schema.getEncodingType(),
schema.getProps(), context);
- tsfileResourcesForQuery.add(new TsFileResource(tsFileResource.getFile(),
+ tsfileResourcesForQuery.add(new TsFileResource(tsFileResource.getFile(),
tsFileResource.getDeviceToIndexMap(),
tsFileResource.getStartTimes(), tsFileResource.getEndTimes(), pair.left,
pair.right));
@@ -1265,7 +1247,8 @@ public class StorageGroupProcessor {
int deviceIndex = tsFileResource.getDeviceToIndexMap().get(deviceId);
long startTime = tsFileResource.getStartTime(deviceIndex);
- long endTime = tsFileResource.isClosed() || !isSeq ? tsFileResource.getEndTime(deviceIndex) : Long.MAX_VALUE;
+ long endTime = tsFileResource.isClosed() || !isSeq ? tsFileResource.getEndTime(deviceIndex)
+ : Long.MAX_VALUE;
if (!isAlive(endTime)) {
return false;
@@ -1282,9 +1265,9 @@ public class StorageGroupProcessor {
* Delete data whose timestamp <= 'timestamp' and belongs to the time series
* deviceId.measurementId.
*
- * @param deviceId the deviceId of the timeseries to be deleted.
+ * @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
- * @param timestamp the delete range is (0, timestamp].
+ * @param timestamp the delete range is (0, timestamp].
*/
public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
// TODO: how to avoid partial deletion?
@@ -1339,7 +1322,8 @@ public class StorageGroupProcessor {
}
}
- private void logDeletion(long timestamp, String deviceId, String measurementId, long timePartitionId)
+ private void logDeletion(long timestamp, String deviceId, String measurementId,
+ long timePartitionId)
throws IOException {
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId));
@@ -1421,7 +1405,8 @@ public class StorageGroupProcessor {
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
- updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(), entry.getKey(), entry.getValue());
+ updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+ entry.getKey(), entry.getValue());
if (globalLatestFlushedTimeForEachDevice
.getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
@@ -1434,10 +1419,11 @@ public class StorageGroupProcessor {
/**
* used for upgrading
*/
- public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long partitionId, String deviceId, long time) {
+ public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long partitionId,
+ String deviceId, long time) {
newlyFlushedPartitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(partitionId, id -> new HashMap<>())
- .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
+ .computeIfAbsent(partitionId, id -> new HashMap<>())
+ .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
}
/**
@@ -1490,9 +1476,9 @@ public class StorageGroupProcessor {
List<TsFileResource> upgradedResources = tsFileResource.getUpgradedResources();
for (TsFileResource resource : upgradedResources) {
long partitionId = resource.getTimePartition();
- resource.getDeviceToIndexMap().forEach((device, index) ->
- updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device,
- resource.getEndTime(index))
+ resource.getDeviceToIndexMap().forEach((device, index) ->
+ updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device,
+ resource.getEndTime(index))
);
}
insertLock.writeLock().lock();
@@ -1506,7 +1492,7 @@ public class StorageGroupProcessor {
}
mergeLock.writeLock().unlock();
insertLock.writeLock().unlock();
-
+
// after upgrade complete, update partitionLatestFlushedTimeForEachDevice
if (countUpgradeFiles() == 0) {
for (Entry<Long, Map<String, Long>> entry : newlyFlushedPartitionLatestFlushedTimeForEachDevice
@@ -1678,7 +1664,7 @@ public class StorageGroupProcessor {
if (seqFile.getWriteQueryLock().writeLock().isHeldByCurrentThread()) {
seqFile.getWriteQueryLock().writeLock().unlock();
}
- if(mergeLock.writeLock().isHeldByCurrentThread()) {
+ if (mergeLock.writeLock().isHeldByCurrentThread()) {
mergeLock.writeLock().unlock();
}
}
@@ -1717,7 +1703,7 @@ public class StorageGroupProcessor {
mergeLock.writeLock().lock();
try {
if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
- newFilePartitionId)){
+ newFilePartitionId)) {
updateLatestTimeMap(newTsFileResource);
}
} catch (DiskSpaceInsufficientException e) {
@@ -1772,7 +1758,8 @@ public class StorageGroupProcessor {
if (!newFileName.equals(tsfileToBeInserted.getName())) {
logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
tsfileToBeInserted.getName(), newFileName);
- newTsFileResource.setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
+ newTsFileResource
+ .setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
}
}
loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
@@ -1784,7 +1771,8 @@ public class StorageGroupProcessor {
long partitionNum = newTsFileResource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
.addAll(newTsFileResource.getHistoricalVersions());
- updatePartitionFileVersion(partitionNum, Collections.max(newTsFileResource.getHistoricalVersions()));
+ updatePartitionFileVersion(partitionNum,
+ Collections.max(newTsFileResource.getHistoricalVersions()));
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -1798,12 +1786,14 @@ public class StorageGroupProcessor {
}
/**
- * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
+ * Find the position of "newTsFileResource" in the sequence files if it can be inserted into
+ * them.
+ *
* @param newTsFileResource
* @param newFilePartitionId
- * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one to be inserted
- * POS_OVERLAP(-3) if some file overlaps the new file
- * an insertion position i >= -1 if the new file can be inserted between [i, i+1]
+ * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted
+ * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
+ * file can be inserted between [i, i+1]
*/
private int findInsertionPosition(TsFileResource newTsFileResource, long newFilePartitionId,
List<TsFileResource> sequenceList) {
@@ -1844,11 +1834,11 @@ public class StorageGroupProcessor {
/**
* Compare each device in the two files to find the time relation of them.
+ *
* @param fileA
* @param fileB
- * @return -1 if fileA is totally older than fileB (A < B)
- * 0 if fileA is partially older than fileB and partially newer than fileB (A X B)
- * 1 if fileA is totally newer than fileB (B < A)
+ * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than
+ * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)
*/
private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
boolean hasPre = false, hasSubsequence = false;
@@ -1885,9 +1875,9 @@ public class StorageGroupProcessor {
/**
* If the historical versions of a file is a sub-set of the given file's, remove it to reduce
- * unnecessary merge. Only used when the file sender and the receiver share the same file
- * close policy.
- * Warning: DO NOT REMOVE
+ * unnecessary merge. Only used when the file sender and the receiver share the same file close
+ * policy. Warning: DO NOT REMOVE
+ *
* @param resource
*/
@SuppressWarnings("unused")
@@ -1948,9 +1938,9 @@ public class StorageGroupProcessor {
* returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
* version number is the version number in the tsfile with a larger timestamp.
*
- * @param tsfileName origin tsfile name
- * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex
- * + 1]
+ * @param tsfileName origin tsfile name
+ * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
+ * 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2014,12 +2004,12 @@ public class StorageGroupProcessor {
/**
* Execute the loading process by the type.
*
- * @param type load type
- * @param tsFileResource tsfile resource to be loaded
+ * @param type load type
+ * @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
- * @UsedBy sync module, load external tsfile module.
* @return load the file successfully
* @UsedBy sync module, load external tsfile module.
+ * @UsedBy sync module, load external tsfile module.
*/
private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
TsFileResource tsFileResource, long filePartitionId)
@@ -2027,9 +2017,11 @@ public class StorageGroupProcessor {
File targetFile;
switch (type) {
case LOAD_UNSEQUENCE:
- targetFile = fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
- storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource
- .getFile().getName());
+ targetFile = fsFactory
+ .getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+ storageGroupName + File.separatorChar + filePartitionId + File.separator
+ + tsFileResource
+ .getFile().getName());
tsFileResource.setFile(targetFile);
if (unSequenceFileList.contains(tsFileResource)) {
logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
@@ -2088,7 +2080,8 @@ public class StorageGroupProcessor {
}
partitionDirectFileVersions.computeIfAbsent(filePartitionId,
p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
- updatePartitionFileVersion(filePartitionId, Collections.max(tsFileResource.getHistoricalVersions()));
+ updatePartitionFileVersion(filePartitionId,
+ Collections.max(tsFileResource.getHistoricalVersions()));
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index c4818e3..38ab3c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -76,7 +76,7 @@ public class TsFileRecoverPerformer {
* 1. recover the TsFile by RestorableTsFileIOWriter and truncate the file to remaining corrected
* data 2. redo the WALs to recover unpersisted data 3. flush and close the file 4. clean WALs
*
- * @return a RestorableTsFileIOWriter if the file is not closed before crush, so this writer can
+ * @return a RestorableTsFileIOWriter if the file is not closed before crash, so this writer can
* be used to continue writing
*/
public RestorableTsFileIOWriter recover() throws StorageGroupProcessorException {
@@ -127,20 +127,20 @@ public class TsFileRecoverPerformer {
// due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
// map must be updated first to avoid duplicated insertion
recoverResourceFromWriter(restorableTsFileIOWriter);
- }
- // redo logs
- redoLogs(restorableTsFileIOWriter);
+ // redo logs
+ redoLogs(restorableTsFileIOWriter);
- // clean logs
- try {
- MultiFileLogNodeManager.getInstance()
- .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
- } catch (IOException e) {
- throw new StorageGroupProcessorException(e);
- }
+ // clean logs
+ try {
+ MultiFileLogNodeManager.getInstance()
+ .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
+ } catch (IOException e) {
+ throw new StorageGroupProcessorException(e);
+ }
- return restorableTsFileIOWriter;
+ return restorableTsFileIOWriter;
+ }
}
private void recoverResourceFromFile() throws IOException {
@@ -206,10 +206,10 @@ public class TsFileRecoverPerformer {
// end the file if it is not the last file or it is closed before crush
restorableTsFileIOWriter.endFile();
resource.cleanCloseFlag();
+ resource.serialize();
}
// otherwise this file is not closed before crush, do nothing so we can continue writing
// into it
- resource.serialize();
} catch (IOException | InterruptedException | ExecutionException e) {
throw new StorageGroupProcessorException(e);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java
index 0217147..fb00e62 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.tsfile.read;
public class TsFileCheckStatus {
public static final long COMPLETE_FILE = -1;
- public static final long ONLY_MAGIC_HEAD = -2;
public static final long INCOMPATIBLE_FILE = -3;
public static final long FILE_NOT_FOUND = -4;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index d259f1d..f8a6133 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -96,7 +96,7 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* construct function for TsFileSequenceReader.
*
- * @param file -given file name
+ * @param file -given file name
* @param loadMetadataSize -whether load meta data size
*/
public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException {
@@ -137,7 +137,7 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* construct function for TsFileSequenceReader.
*
- * @param input -given input
+ * @param input -given input
* @param loadMetadataSize -load meta data size
*/
public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize) throws IOException {
@@ -155,10 +155,10 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* construct function for TsFileSequenceReader.
*
- * @param input the input of a tsfile. The current position should be a markder and then a chunk
- * Header, rather than the magic number
- * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning
- * of the input to the current position
+ * @param input the input of a tsfile. The current position should be a markder and
+ * then a chunk Header, rather than the magic number
+ * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning
+ * of the input to the current position
* @param fileMetadataSize the byte size of the file metadata in the input
*/
public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int fileMetadataSize) {
@@ -508,9 +508,9 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas
*
- * @param metadataIndex MetadataIndexEntry
- * @param buffer byte buffer
- * @param deviceId String
+ * @param metadataIndex MetadataIndexEntry
+ * @param buffer byte buffer
+ * @param deviceId String
* @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list
*/
private void generateMetadataIndex(MetadataIndexEntry metadataIndex, ByteBuffer buffer,
@@ -584,12 +584,13 @@ public class TsFileSequenceReader implements AutoCloseable {
* Get target MetadataIndexEntry and its end offset
*
* @param metadataIndex given MetadataIndexNode
- * @param name target device / measurement name
- * @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or INTERNAL_MEASUREMENT. When
- * searching for a device node, return when it is not INTERNAL_DEVICE. Likewise, when searching
- * for a measurement node, return when it is not INTERNAL_MEASUREMENT. This works for the
- * situation when the index tree does NOT have the device level and ONLY has the measurement
- * level.
+ * @param name target device / measurement name
+ * @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or
+ * INTERNAL_MEASUREMENT. When searching for a device node, return when it is
+ * not INTERNAL_DEVICE. Likewise, when searching for a measurement node,
+ * return when it is not INTERNAL_MEASUREMENT. This works for the situation
+ * when the index tree does NOT have the device level and ONLY has the
+ * measurement level.
* @return target MetadataIndexEntry, endOffset pair
*/
private Pair<MetadataIndexEntry, Long> getMetadataAndEndOffset(MetadataIndexNode metadataIndex,
@@ -616,7 +617,7 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER.
*
- * @param position the offset of the chunk group footer in the file
+ * @param position the offset of the chunk group footer in the file
* @param markerRead true if the offset does not contains the marker , otherwise false
* @return a CHUNK_GROUP_FOOTER
* @throws IOException io error
@@ -649,9 +650,9 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* read the chunk's header.
*
- * @param position the file offset of this chunk's header
+ * @param position the file offset of this chunk's header
* @param chunkHeaderSize the size of chunk's header
- * @param markerRead true if the offset does not contains the marker , otherwise false
+ * @param markerRead true if the offset does not contains the marker , otherwise false
*/
private ChunkHeader readChunkHeader(long position, int chunkHeaderSize, boolean markerRead)
throws IOException {
@@ -756,8 +757,8 @@ public class TsFileSequenceReader implements AutoCloseable {
* changed.
*
* @param position the start position of data in the tsFileInput, or the current position if
- * position = -1
- * @param size the size of data that want to read
+ * position = -1
+ * @param size the size of data that want to read
* @return data that been read.
*/
private ByteBuffer readData(long position, int size) throws IOException {
@@ -783,8 +784,8 @@ public class TsFileSequenceReader implements AutoCloseable {
* position.
*
* @param start the start position of data in the tsFileInput, or the current position if position
- * = -1
- * @param end the end position of data that want to read
+ * = -1
+ * @param end the end position of data that want to read
* @return data that been read.
*/
private ByteBuffer readData(long start, long end) throws IOException {
@@ -801,11 +802,11 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* Self Check the file and return the position before where the data is safe.
*
- * @param newSchema the schema on each time series in the file
+ * @param newSchema the schema on each time series in the file
* @param chunkGroupMetadataList ChunkGroupMetadata List
- * @param versionInfo version pair List
- * @param fastFinish if true and the file is complete, then newSchema and chunkGroupMetadataList
- * parameter will be not modified.
+ * @param versionInfo version pair List
+ * @param fastFinish if true and the file is complete, then newSchema and
+ * chunkGroupMetadataList parameter will be not modified.
* @return the position of the file that is fine. All data after the position in the file should
* be truncated.
*/
@@ -835,15 +836,15 @@ public class TsFileSequenceReader implements AutoCloseable {
if (fileSize < headerLength) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
- String magic = readHeadMagic();
- tsFileInput.position(headerLength);
- if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
+ if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic()) || !TSFileConfig.VERSION_NUMBER
+ .equals(readVersionNumber())) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
+ tsFileInput.position(headerLength);
if (fileSize == headerLength) {
- return TsFileCheckStatus.ONLY_MAGIC_HEAD;
- } else if (readTailMagic().equals(magic)) {
+ return headerLength;
+ } else if (isComplete()) {
loadMetadataSize();
if (fastFinish) {
return TsFileCheckStatus.COMPLETE_FILE;
@@ -851,7 +852,7 @@ public class TsFileSequenceReader implements AutoCloseable {
}
boolean newChunkGroup = true;
// not a complete file, we will recover it...
- long truncatedPosition = headerLength;
+ long truncatedSize = headerLength;
byte marker;
int chunkCnt = 0;
List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
@@ -900,7 +901,7 @@ public class TsFileSequenceReader implements AutoCloseable {
}
chunkGroupMetadataList.add(new ChunkGroupMetadata(deviceID, chunkMetadataList));
newChunkGroup = true;
- truncatedPosition = this.position();
+ truncatedSize = this.position();
totalChunkNum += chunkCnt;
chunkCnt = 0;
@@ -909,7 +910,7 @@ public class TsFileSequenceReader implements AutoCloseable {
case MetaMarker.VERSION:
long version = readVersion();
versionInfo.add(new Pair<>(position(), version));
- truncatedPosition = this.position();
+ truncatedSize = this.position();
break;
default:
// the disk file is corrupted, using this file may be dangerous
@@ -918,14 +919,14 @@ public class TsFileSequenceReader implements AutoCloseable {
}
// now we read the tail of the data section, so we are sure that the last
// ChunkGroupFooter is complete.
- truncatedPosition = this.position() - 1;
+ truncatedSize = this.position() - 1;
} catch (Exception e) {
logger.info("TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}",
file, this.position(), e.getMessage());
}
// Despite the completeness of the data section, we will discard current FileMetadata
// so that we can continue to write data into this tsfile.
- return truncatedPosition;
+ return truncatedSize;
}
public int getTotalChunkNum() {
@@ -992,7 +993,7 @@ public class TsFileSequenceReader implements AutoCloseable {
* get device names which has valid chunks in [start, end)
*
* @param start start of the partition
- * @param end end of the partition
+ * @param end end of the partition
* @return device names in range
*/
public List<String> getDeviceNameInRange(long start, long end) throws IOException {
@@ -1010,8 +1011,8 @@ public class TsFileSequenceReader implements AutoCloseable {
* Check if the device has at least one Chunk in this partition
*
* @param seriesMetadataMap chunkMetaDataList of each measurement
- * @param start the start position of the space partition
- * @param end the end position of the space partition
+ * @param start the start position of the space partition
+ * @param end the end position of the space partition
*/
private boolean hasDataInPartition(Map<String, List<ChunkMetadata>> seriesMetadataMap,
long start, long end) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
index 1e6e105..8423dfb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
@@ -79,8 +79,8 @@ public class LocalTsFileOutput implements TsFileOutput {
}
@Override
- public void truncate(long position) throws IOException {
- outputStream.getChannel().truncate(position);
+ public void truncate(long size) throws IOException {
+ outputStream.getChannel().truncate(size);
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 8a12b05..536f887 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -48,7 +47,7 @@ import org.slf4j.LoggerFactory;
public class RestorableTsFileIOWriter extends TsFileIOWriter {
private static final Logger logger = LoggerFactory.getLogger("FileMonitor");
- private long truncatedPosition = -1;
+ private long truncatedSize = -1;
private Map<Path, MeasurementSchema> knownSchemas = new HashMap<>();
private int lastFlushedChunkGroupIndex = 0;
@@ -74,36 +73,30 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
// file doesn't exist
if (file.length() == 0) {
startFile();
+ canWrite = true;
+ crashed = true;
return;
}
if (file.exists()) {
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
- // this tsfile is complete
- if (reader.isComplete()) {
+ truncatedSize = reader
+ .selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true);
+ totalChunkNum = reader.getTotalChunkNum();
+ if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {
crashed = false;
canWrite = false;
out.close();
- return;
- }
-
- // uncompleted file
- truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true);
- totalChunkNum = reader.getTotalChunkNum();
- if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
+ } else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) {
out.close();
throw new NotCompatibleTsFileException(
String.format("%s is not in TsFile format.", file.getAbsolutePath()));
- } else if (truncatedPosition == TsFileCheckStatus.ONLY_MAGIC_HEAD) {
- crashed = true;
- out.truncate(
- (long) TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
- .getBytes().length);
} else {
crashed = true;
+ canWrite = true;
// remove broken data
- out.truncate(truncatedPosition);
+ out.truncate(truncatedSize);
}
}
}
@@ -140,8 +133,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
return new RestorableTsFileIOWriter(file);
}
- long getTruncatedPosition() {
- return truncatedPosition;
+ long getTruncatedSize() {
+ return truncatedSize;
}
public Map<Path, MeasurementSchema> getKnownSchema() {
@@ -162,7 +155,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
public List<ChunkMetadata> getVisibleMetadataList(String deviceId, String measurementId,
TSDataType dataType) {
List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
- if (metadatasForQuery.containsKey(deviceId) && metadatasForQuery.get(deviceId).containsKey(measurementId)) {
+ if (metadatasForQuery.containsKey(deviceId) && metadatasForQuery.get(deviceId)
+ .containsKey(measurementId)) {
for (ChunkMetadata chunkMetaData : metadatasForQuery.get(deviceId).get(measurementId)) {
// filter: if a device'measurement is defined as float type, and data has been persistent.
// Then someone deletes the timeseries and recreate it with Int type. We have to ignore
@@ -180,8 +174,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
}
/**
- * add all appendChunkMetadatas into memory. After calling this method, other classes can
- * read these metadata.
+ * add all appendChunkMetadatas into memory. After calling this method, other classes can read
+ * these metadata.
*/
public void makeMetadataVisible() {
@@ -212,8 +206,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
}
/**
- * get all the chunk's metadata which are appended after the last calling of this method, or
- * after the class instance is initialized if this is the first time to call the method.
+ * get all the chunk's metadata which are appended after the last calling of this method, or after
+ * the class instance is initialized if this is the first time to call the method.
*
* @return a list of Device ChunkMetadataList Pair
*/
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
index 5b9fab1..8da1f17 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
@@ -78,8 +78,8 @@ public interface TsFileOutput {
/**
* The same with {@link java.nio.channels.FileChannel#truncate(long)}.
*
- * @param position -position
+ * @param size The new size, a non-negative byte count
*/
- void truncate(long position) throws IOException;
+ void truncate(long size) throws IOException;
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
index 0c777c3..9b22991 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
@@ -29,6 +29,7 @@ import java.io.File;
import java.io.FileWriter;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.constant.TestConstant;
import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException;
import org.apache.iotdb.tsfile.file.MetaMarker;
@@ -81,10 +82,12 @@ public class RestorableTsFileIOWriterTest {
RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
- assertEquals(TsFileCheckStatus.ONLY_MAGIC_HEAD, rWriter.getTruncatedPosition());
+ assertEquals(
+ TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER.getBytes().length,
+ rWriter.getTruncatedSize());
rWriter = new RestorableTsFileIOWriter(file);
- assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedPosition());
+ assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedSize());
assertFalse(rWriter.canWrite());
rWriter.close();
assertTrue(file.delete());
@@ -101,7 +104,7 @@ public class RestorableTsFileIOWriterTest {
writer = new TsFileWriter(rWriter);
writer.close();
assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
- rWriter.getTruncatedPosition());
+ rWriter.getTruncatedSize());
assertTrue(file.delete());
}
@@ -115,7 +118,7 @@ public class RestorableTsFileIOWriterTest {
TsFileWriter writer = new TsFileWriter(rWriter);
writer.close();
assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
- rWriter.getTruncatedPosition());
+ rWriter.getTruncatedSize());
assertTrue(file.delete());
}
@@ -133,7 +136,7 @@ public class RestorableTsFileIOWriterTest {
writer = new TsFileWriter(rWriter);
writer.close();
assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
- rWriter.getTruncatedPosition());
+ rWriter.getTruncatedSize());
assertTrue(file.delete());
}
@@ -159,7 +162,7 @@ public class RestorableTsFileIOWriterTest {
writer = new TsFileWriter(rWriter);
writer.close();
// truncate version marker and version
- assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedPosition());
+ assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedSize());
assertTrue(file.delete());
}
@@ -221,7 +224,7 @@ public class RestorableTsFileIOWriterTest {
writer.close();
assertNotEquals(
TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
- rWriter.getTruncatedPosition());
+ rWriter.getTruncatedSize());
TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(new Path("d1.s1"));
assertNotNull(chunkMetadataList);