You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/23 02:35:48 UTC
[incubator-iotdb] branch master updated: Fix recover bug with vm
(#1529)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a3df582 Fix recover bug with vm (#1529)
a3df582 is described below
commit a3df5827907069577ed6b5207687a370f87830c7
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Thu Jul 23 10:35:34 2020 +0800
Fix recover bug with vm (#1529)
* fix recover vm bug
* fix show latest timeseries and add printStackTrace for CI
* change show latest timeseries way
Co-authored-by: 张凌哲 <44...@qq.com>
Co-authored-by: qiaojialin <64...@qq.com>
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 2 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 14 +-
.../iotdb/db/engine/flush/VmLogAnalyzer.java | 3 +-
.../org/apache/iotdb/db/engine/flush/VmLogger.java | 6 +
.../engine/storagegroup/StorageGroupProcessor.java | 85 +++++++----
.../db/engine/storagegroup/TsFileProcessor.java | 155 ++++++++++---------
.../org/apache/iotdb/db/metadata/MManager.java | 165 ++++++++++++---------
.../java/org/apache/iotdb/db/metadata/MTree.java | 39 +++--
.../writelog/recover/TsFileRecoverPerformer.java | 156 ++++++++++---------
.../db/engine/memtable/MemTableFlushTaskTest.java | 3 +-
.../recover/RecoverResourceFromReaderTest.java | 16 +-
.../db/writelog/recover/SeqTsFileRecoverTest.java | 45 +++---
.../writelog/recover/UnseqTsFileRecoverTest.java | 16 +-
.../iotdb/tsfile/read/TsFileSequenceReader.java | 2 +-
.../write/writer/RestorableTsFileIOWriter.java | 4 +
.../write/writer/RestorableTsFileIOWriterTest.java | 9 +-
16 files changed, 405 insertions(+), 315 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 1846925..a11e944 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -166,7 +166,7 @@ public class StorageEngine implements IService {
logger.info("Storage Group Processor {} is recovered successfully",
storageGroup.getFullPath());
} catch (Exception e) {
- logger.error("meet error when recovering storage group: {}", storageGroup, e);
+ logger.error("meet error when recovering storage group: {}", storageGroup.getFullPath(), e);
}
return null;
}));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index d993f14..25c9878 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -52,6 +52,7 @@ public class MemTableFlushTask {
private final Future<?> encodingTaskFuture;
private final Future<?> ioTaskFuture;
private RestorableTsFileIOWriter writer;
+ private RestorableTsFileIOWriter tsFileIOWriter;
private final ConcurrentLinkedQueue<Object> ioTaskQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<Object> encodingTaskQueue = new ConcurrentLinkedQueue<>();
@@ -62,9 +63,18 @@ public class MemTableFlushTask {
private volatile boolean noMoreEncodingTask = false;
private volatile boolean noMoreIOTask = false;
- public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter writer, String storageGroup) {
+ /**
+ * @param memTable the memTable to flush
+ * @param writer the writer where memTable will be flushed to (current tsfile writer or vm writer)
+ * @param storageGroup current storage group
+ * @param tsFileIOWriter current tsfile writer (use it to create flushLog)
+ */
+
+ public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter writer, String storageGroup,
+ RestorableTsFileIOWriter tsFileIOWriter) {
this.memTable = memTable;
this.writer = writer;
+ this.tsFileIOWriter = tsFileIOWriter;
this.storageGroup = storageGroup;
this.encodingTaskFuture = subTaskPoolManager.submit(encodingTask);
this.ioTaskFuture = subTaskPoolManager.submit(ioTask);
@@ -80,7 +90,7 @@ public class MemTableFlushTask {
long start = System.currentTimeMillis();
long sortTime = 0;
- File flushLogFile = getFlushLogFile(writer);
+ File flushLogFile = getFlushLogFile(tsFileIOWriter);
if (!flushLogFile.createNewFile()) {
logger.error("Failed to create file {}", flushLogFile);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/VmLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/VmLogAnalyzer.java
index 67612b6..4f96078 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/VmLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/VmLogAnalyzer.java
@@ -54,8 +54,7 @@ public class VmLogAnalyzer {
public void analyze() throws IOException {
String currLine;
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(logFile))) {
- currLine = bufferedReader.readLine();
- while (currLine != null) {
+ while ((currLine = bufferedReader.readLine()) != null) {
switch (currLine) {
case SOURCE_NAME:
currLine = bufferedReader.readLine();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/VmLogger.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/VmLogger.java
index e075b4d..4bf4b6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/VmLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/VmLogger.java
@@ -48,6 +48,12 @@ public class VmLogger {
logStream.close();
}
+ /**
+ * We need to log the offset of the device instead of depending on the selfCheck() func to do the
+ * truncation. Because if the current chunk group has been flushed to the tsfile but the device
+ * hasn't been logged into vmLog then selfCheck() func won't truncate the chunk group. When we do
+ * recovery, we will flush the device data again.
+ */
public void logDevice(String device, long offset) throws IOException {
logStream.write(String.format(FORMAT_DEVICE_OFFSET, device, offset));
logStream.newLine();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index db890bc..f384496 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -604,12 +604,17 @@ public class StorageGroupProcessor {
}
private void recoverTsFiles(List<TsFileResource> tsFiles,
- Map<String, List<List<TsFileResource>>> vmFiles, boolean isSeq) {
+ Map<String, List<List<TsFileResource>>> vmFiles, boolean isSeq)
+ throws StorageGroupProcessorException {
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
long timePartitionId = tsFileResource.getTimePartition();
+
+ List<List<TsFileResource>> defaultVmTsFileResources = new ArrayList<>();
+ defaultVmTsFileResources.add(new ArrayList<>());
+
List<List<TsFileResource>> vmTsFileResources = vmFiles
- .getOrDefault(tsFileResource.getTsFilePath(), new ArrayList<>());
+ .getOrDefault(tsFileResource.getTsFilePath(), defaultVmTsFileResources);
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(
storageGroupName + FILE_NAME_SEPARATOR,
getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true,
@@ -630,7 +635,24 @@ public class StorageGroupProcessor {
continue;
}
if (i != tsFiles.size() - 1 || !writer.canWrite()) {
- // not the last file or cannot write, just close it
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableVm() && writer.canWrite()) {
+ // vm is enable and the writer is not the last one but it can still be written
+ // we still need to recover it
+ TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
+ vmTsFileResources, getVersionControllerByTimePartitionId(timePartitionId),
+ this::closeUnsealedTsFileProcessorCallBack, this::updateLatestFlushTimeCallback,
+ isSeq, writer, vmWriters);
+ tsFileProcessor.recover();
+ // end the file if it is not the last file
+ try {
+ writer.endFile();
+ tsFileResource.cleanCloseFlag();
+ tsFileResource.serialize();
+ } catch (IOException e) {
+ throw new StorageGroupProcessorException(e);
+ }
+
+ }
tsFileResource.setClosed(true);
} else if (writer.canWrite()) {
// the last file is not closed, continue writing to in
@@ -639,8 +661,7 @@ public class StorageGroupProcessor {
tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
vmTsFileResources, getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack, this::updateLatestFlushTimeCallback,
- false,
- writer, vmWriters);
+ true, writer, vmWriters);
tsFileProcessor.recover();
workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
tsFileResource.setProcessor(tsFileProcessor);
@@ -825,11 +846,11 @@ public class StorageGroupProcessor {
* inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true otherwise
*/
private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
@@ -887,10 +908,11 @@ public class StorageGroupProcessor {
// just for performance, because in single node version, we do not need the full path of measurement
// so, we want to avoid concat the device and measurement string in single node version
IoTDB.metaManager.updateLastCache(node.getFullPath(),
- plan.composeLastTimeValuePair(i), true, latestFlushedTime, tmpMeasurementNode);
+ plan.composeLastTimeValuePair(i), true, latestFlushedTime, tmpMeasurementNode);
} else {
- IoTDB.metaManager.updateLastCache(node.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurementList[i],
- plan.composeLastTimeValuePair(i), true, latestFlushedTime, tmpMeasurementNode);
+ IoTDB.metaManager
+ .updateLastCache(node.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurementList[i],
+ plan.composeLastTimeValuePair(i), true, latestFlushedTime, tmpMeasurementNode);
}
}
}
@@ -942,10 +964,11 @@ public class StorageGroupProcessor {
// just for performance, because in single node version, we do not need the full path of measurement
// so, we want to avoid concat the device and measurement string in single node version
IoTDB.metaManager.updateLastCache(node.getFullPath(),
- plan.composeTimeValuePair(i), true, latestFlushedTime, tmpMeasurementNode);
+ plan.composeTimeValuePair(i), true, latestFlushedTime, tmpMeasurementNode);
} else {
- IoTDB.metaManager.updateLastCache(node.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurementList[i],
- plan.composeTimeValuePair(i), true, latestFlushedTime, tmpMeasurementNode);
+ IoTDB.metaManager
+ .updateLastCache(node.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurementList[i],
+ plan.composeTimeValuePair(i), true, latestFlushedTime, tmpMeasurementNode);
}
}
}
@@ -977,10 +1000,10 @@ public class StorageGroupProcessor {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param fileList file list to add new processor
- * @param sequence whether is sequence or not
+ * @param fileList file list to add new processor
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1427,10 +1450,10 @@ public class StorageGroupProcessor {
* Delete data whose timestamp <= 'timestamp' and belongs to the time series
* deviceId.measurementId.
*
- * @param deviceId the deviceId of the timeseries to be deleted.
+ * @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
- * @param startTime the startTime of delete range.
- * @param endTime the endTime of delete range.
+ * @param startTime the startTime of delete range.
+ * @param endTime the endTime of delete range.
*/
public void delete(String deviceId, String measurementId, long startTime, long endTime)
throws IOException {
@@ -1973,7 +1996,8 @@ public class StorageGroupProcessor {
if (!newFileName.equals(tsfileToBeInserted.getName())) {
logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
tsfileToBeInserted.getName(), newFileName);
- newTsFileResource.setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
+ newTsFileResource
+ .setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
}
}
loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
@@ -2186,9 +2210,9 @@ public class StorageGroupProcessor {
* returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
* version number is the version number in the tsfile with a larger timestamp.
*
- * @param tsfileName origin tsfile name
+ * @param tsfileName origin tsfile name
* @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
- * 1]
+ * 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2252,8 +2276,8 @@ public class StorageGroupProcessor {
/**
* Execute the loading process by the type.
*
- * @param type load type
- * @param tsFileResource tsfile resource to be loaded
+ * @param type load type
+ * @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
* @return load the file successfully
* @UsedBy sync module, load external tsfile module.
@@ -2264,9 +2288,10 @@ public class StorageGroupProcessor {
File targetFile;
switch (type) {
case LOAD_UNSEQUENCE:
- targetFile = fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
- storageGroupName + File.separatorChar + filePartitionId + File.separator
- + tsFileResource.getTsFile().getName());
+ targetFile = fsFactory
+ .getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+ storageGroupName + File.separatorChar + filePartitionId + File.separator
+ + tsFileResource.getTsFile().getName());
tsFileResource.setFile(targetFile);
if (unSequenceFileList.contains(tsFileResource)) {
logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index ded3d8b..75a2ea1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -23,7 +23,6 @@ import static org.apache.iotdb.db.engine.flush.VmLogger.SOURCE_NAME;
import static org.apache.iotdb.db.engine.flush.VmLogger.TARGET_NAME;
import static org.apache.iotdb.db.engine.flush.VmLogger.VM_LOG_NAME;
import static org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.getVmLevel;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TMP_SUFFIX;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.VM_SUFFIX;
@@ -230,9 +229,9 @@ public class TsFileProcessor {
* the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
*/
public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end,
TSStatus[] results) throws WriteProcessException {
@@ -692,7 +691,11 @@ public class TsFileProcessor {
}
if (targetFile.getName().endsWith(TSFILE_SUFFIX)) {
if (!isMergeFinished) {
- writer.getIOWriterOut().truncate(offset - 1);
+ logger.info("{}: {} merge recover {} level vms to TsFile", storageGroupName,
+ tsFileResource.getTsFile().getName(), vmWriters.size());
+ if (offset > 0) {
+ writer.getIOWriterOut().truncate(offset - 1);
+ }
VmMergeUtils.merge(writer, packVmWritersToSequenceList(vmWriters),
storageGroupName,
new VmLogger(tsFileResource.getTsFile().getParent(),
@@ -716,33 +719,34 @@ public class TsFileProcessor {
newVmWriter.setFile(newVmFile);
}
} else {
- if (deviceSet.isEmpty()) {
- Files.delete(targetFile.toPath());
- } else {
+ logger.info("{}: {} [Hot Compaction Recover] merge level-{}'s {} vms to next level vm",
+ storageGroupName, tsFileResource.getTsFile().getName(), level,
+ sourceFileList.size());
+ if (offset > 0) {
newVmWriter.getIOWriterOut().truncate(offset - 1);
- // vm files must be sequence, so we just have to find the first file
- int startIndex = 0;
- for (startIndex = 0; startIndex < vmWriters.get(level).size(); startIndex++) {
- RestorableTsFileIOWriter levelVmWriter = vmWriters.get(level).get(startIndex);
- if (levelVmWriter.getFile().getAbsolutePath()
- .equals(sourceFileList.get(0).getAbsolutePath())) {
- break;
- }
- }
- List<RestorableTsFileIOWriter> levelVmWriters = new ArrayList<>(
- vmWriters.get(level).subList(startIndex, startIndex + sourceFileList.size()));
- List<TsFileResource> levelVmFiles = new ArrayList<>(
- vmTsFileResources.get(level)
- .subList(startIndex, startIndex + sourceFileList.size()));
- VmMergeUtils.merge(newVmWriter, levelVmWriters,
- storageGroupName,
- new VmLogger(tsFileResource.getTsFile().getParent(),
- tsFileResource.getTsFile().getName()),
- deviceSet, sequence);
- for (int i = 0; i < vmWriters.size(); i++) {
- deleteVmFiles(levelVmFiles, levelVmWriters);
+ }
+ // vm files must be sequence, so we just have to find the first file
+ int startIndex;
+ for (startIndex = 0; startIndex < vmWriters.get(level).size(); startIndex++) {
+ RestorableTsFileIOWriter levelVmWriter = vmWriters.get(level).get(startIndex);
+ if (levelVmWriter.getFile().getAbsolutePath()
+ .equals(sourceFileList.get(0).getAbsolutePath())) {
+ break;
}
}
+ List<RestorableTsFileIOWriter> levelVmWriters = new ArrayList<>(
+ vmWriters.get(level).subList(startIndex, startIndex + sourceFileList.size()));
+ List<TsFileResource> levelVmFiles = new ArrayList<>(
+ vmTsFileResources.get(level)
+ .subList(startIndex, startIndex + sourceFileList.size()));
+ VmMergeUtils.merge(newVmWriter, levelVmWriters,
+ storageGroupName,
+ new VmLogger(tsFileResource.getTsFile().getParent(),
+ tsFileResource.getTsFile().getName()),
+ deviceSet, sequence);
+ for (int i = 0; i < vmWriters.size(); i++) {
+ deleteVmFiles(levelVmFiles, levelVmWriters);
+ }
}
}
}
@@ -778,8 +782,8 @@ public class TsFileProcessor {
tsFileResource.getTsFile().getName());
File newVmFile = createNewVMFileWithLock(tsFileResource, 0);
if (vmWriters.isEmpty()) {
- vmWriters.add(new ArrayList<>());
- vmTsFileResources.add(new ArrayList<>());
+ vmWriters.add(new CopyOnWriteArrayList<>());
+ vmTsFileResources.add(new CopyOnWriteArrayList<>());
}
vmTsFileResources.get(0).add(new TsFileResource(newVmFile));
curWriter = new RestorableTsFileIOWriter(newVmFile);
@@ -790,7 +794,7 @@ public class TsFileProcessor {
curWriter = writer;
}
curWriter.mark();
- flushTask = new MemTableFlushTask(memTableToFlush, curWriter, storageGroupName);
+ flushTask = new MemTableFlushTask(memTableToFlush, curWriter, storageGroupName, writer);
flushTask.syncFlushMemTable();
} catch (Exception e) {
logger.error("{}: {} meet error when flushing a memtable, change system mode to read-only",
@@ -828,37 +832,41 @@ public class TsFileProcessor {
if (shouldClose && flushingMemTables.isEmpty()) {
try {
- // merge vm to tsfile
- while (true) {
- if (!mergeWorking) {
- break;
+ if (config.isEnableVm()) {
+ // merge vm to tsfile
+ while (true) {
+ if (!mergeWorking) {
+ break;
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(10);
+ } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
+ logger.error("{}: {}, closing task is interrupted.",
+ storageGroupName, tsFileResource.getTsFile().getName(), e);
+ // generally it is because the thread pool is shutdown so the task should be aborted
+ break;
+ }
}
- try {
- TimeUnit.MILLISECONDS.sleep(10);
- } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
- logger.error("{}: {}, closing task is interrupted.",
- storageGroupName, tsFileResource.getTsFile().getName(), e);
- // generally it is because the thread pool is shutdown so the task should be aborted
- break;
+ logger.info("{}: [Hot Compaction] Start to merge total {} levels' vm to TsFile {}",
+ storageGroupName, vmTsFileResources.size() + 1, tsFileResource.getTsFile().getName());
+ long startTimeMillis = System.currentTimeMillis();
+ VmLogger vmLogger = new VmLogger(tsFileResource.getTsFile().getParent(),
+ tsFileResource.getTsFile().getName());
+ vmLogger.logFile(TARGET_NAME, writer.getFile());
+ flushAllVmToTsFile(vmWriters, vmTsFileResources, vmLogger);
+ vmLogger.logMergeFinish();
+ vmLogger.close();
+ File logFile = FSFactoryProducer.getFSFactory()
+ .getFile(tsFileResource.getTsFile().getParent(),
+ tsFileResource.getTsFile().getName() + VM_LOG_NAME);
+ if (logFile.exists()) {
+ Files.delete(logFile.toPath());
}
+ logger
+ .info("{}: [Hot Compaction] All vms are merged to TsFile {}, time consumption: {} ms",
+ storageGroupName, tsFileResource.getTsFile().getName(),
+ System.currentTimeMillis() - startTimeMillis);
}
- logger.info("{}: [Hot Compaction] Start to merge total {} levels' vm to TsFile {}",
- storageGroupName, vmTsFileResources.size() + 1, tsFileResource.getTsFile().getName());
- long startTimeMillis = System.currentTimeMillis();
- VmLogger vmLogger = new VmLogger(tsFileResource.getTsFile().getParent(),
- tsFileResource.getTsFile().getName());
- flushAllVmToTsFile(vmWriters, vmTsFileResources, vmLogger);
- vmLogger.logMergeFinish();
- vmLogger.close();
- File logFile = FSFactoryProducer.getFSFactory()
- .getFile(tsFileResource.getTsFile().getParent(),
- tsFileResource.getTsFile().getName() + VM_LOG_NAME);
- if (logFile.exists()) {
- Files.delete(logFile.toPath());
- }
- logger.info("{}: [Hot Compaction] All vms are merged to TsFile {}, time consumption: {} ms",
- storageGroupName, tsFileResource.getTsFile().getName(),
- System.currentTimeMillis() - startTimeMillis);
writer.mark();
try {
double compressionRatio = ((double) totalMemTableSize) / writer.getPos();
@@ -912,7 +920,7 @@ public class TsFileProcessor {
}
} else {
// on other merge task working now, it's safe to submit one.
- if (!mergeWorking) {
+ if (config.isEnableVm() && !mergeWorking) {
mergeWorking = true;
logger.info("{}: {} submit a vm merge task", storageGroupName,
tsFileResource.getTsFile().getName());
@@ -1002,10 +1010,10 @@ public class TsFileProcessor {
* memtables and then compact them into one TimeValuePairSorter). Then get the related
* ChunkMetadata of data on disk.
*
- * @param deviceId device id
+ * @param deviceId device id
* @param measurementId measurements id
- * @param dataType data type
- * @param encoding encoding
+ * @param dataType data type
+ * @param encoding encoding
*/
public void query(String deviceId, String measurementId, TSDataType dataType, TSEncoding encoding,
Map<String, String> props, QueryContext context,
@@ -1059,18 +1067,9 @@ public class TsFileProcessor {
for (int j = 0; j < vmWriters.get(i).size(); j++) {
RestorableTsFileIOWriter vmWriter = vmWriters.get(i).get(j);
TsFileResource vmTsFileResource = vmTsFileResources.get(i).get(j);
- for (Entry<String, Map<String, List<ChunkMetadata>>> entry : vmWriter
- .getMetadatasForQuery()
- .entrySet()) {
- String device = entry.getKey();
- for (List<ChunkMetadata> tmpChunkMetadataList : entry.getValue().values()) {
- for (ChunkMetadata chunkMetadata : tmpChunkMetadataList) {
- vmTsFileResource.updateStartTime(device, chunkMetadata.getStartTime());
- if (!sequence) {
- vmTsFileResource.updateEndTime(device, chunkMetadata.getEndTime());
- }
- }
- }
+ vmTsFileResource.updateStartTime(deviceId, tsFileResource.getStartTime(deviceId));
+ if (!sequence) {
+ vmTsFileResource.updateEndTime(deviceId, tsFileResource.getEndTime(deviceId));
}
chunkMetadataList = vmWriter.getVisibleMetadataList(deviceId, measurementId, dataType);
QueryUtils.modifyChunkMetaData(chunkMetadataList,
@@ -1164,8 +1163,8 @@ public class TsFileProcessor {
long vmPointNum = 0;
// all flush to target file
Map<Path, MeasurementSchema> pathMeasurementSchemaMap = new HashMap<>();
- for (List<RestorableTsFileIOWriter> subVmWriters : vmMergeWriters) {
- for (RestorableTsFileIOWriter vmWriter : subVmWriters) {
+ for (List<RestorableTsFileIOWriter> levelVmWriters : vmMergeWriters) {
+ for (RestorableTsFileIOWriter vmWriter : levelVmWriters) {
Map<String, Map<String, List<ChunkMetadata>>> schemaMap = vmWriter
.getMetadatasForQuery();
for (Entry<String, Map<String, List<ChunkMetadata>>> schemaMapEntry : schemaMap
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index fdb95c6..e2f3801 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,6 +18,31 @@
*/
package org.apache.iotdb.db.metadata;
+import static java.util.stream.Collectors.toList;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
@@ -25,7 +50,12 @@ import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
-import org.apache.iotdb.db.exception.metadata.*;
+import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
@@ -36,6 +66,8 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.utils.RandomDeleteCache;
import org.apache.iotdb.db.utils.TestOnly;
@@ -54,20 +86,6 @@ import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.stream.Collectors.toList;
-
/**
* This class takes the responsibility of serialization of all the metadata info and persistent it
* into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -166,6 +184,7 @@ public class MManager {
/**
* we should not use this function in other place, but only in IoTDB class
+ *
* @return
*/
public static MManager getInstance() {
@@ -414,9 +433,9 @@ public class MManager {
/**
* Add one timeseries to metadata tree, if the timeseries already exists, throw exception
*
- * @param path the timeseries path
- * @param dataType the dateType {@code DataType} of the timeseries
- * @param encoding the encoding function {@code Encoding} of the timeseries
+ * @param path the timeseries path
+ * @param dataType the dateType {@code DataType} of the timeseries
+ * @param encoding the encoding function {@code Encoding} of the timeseries
* @param compressor the compressor function {@code Compressor} of the time series
* @return whether the measurement occurs for the first time in this storage group (if true, the
* measurement should be registered to the StorageEngine too)
@@ -680,7 +699,7 @@ public class MManager {
* Get all devices under given prefixPath.
*
* @param prefixPath a prefix of a full path. if the wildcard is not at the tail, then each
- * wildcard can only match one level, otherwise it can match to the tail.
+ * wildcard can only match one level, otherwise it can match to the tail.
* @return A HashSet instance which stores devices names with given prefixPath.
*/
public Set<String> getDevices(String prefixPath) throws MetadataException {
@@ -696,16 +715,17 @@ public class MManager {
* Get all nodes from the given level
*
* @param prefixPath can be a prefix of a full path. Can not be a full path. can not have
- * wildcard. But, the level of the prefixPath can be smaller than the given level, e.g.,
- * prefixPath = root.a while the given level is 5
- * @param nodeLevel the level can not be smaller than the level of the prefixPath
+ * wildcard. But, the level of the prefixPath can be smaller than the given
+ * level, e.g., prefixPath = root.a while the given level is 5
+ * @param nodeLevel the level can not be smaller than the level of the prefixPath
* @return A List instance which stores all node at given level
*/
public List<String> getNodesList(String prefixPath, int nodeLevel) throws MetadataException {
return getNodesList(prefixPath, nodeLevel, null);
}
- public List<String> getNodesList(String prefixPath, int nodeLevel, StorageGroupFilter filter) throws MetadataException {
+ public List<String> getNodesList(String prefixPath, int nodeLevel, StorageGroupFilter filter)
+ throws MetadataException {
lock.readLock().lock();
try {
return mtree.getNodesList(prefixPath, nodeLevel, filter);
@@ -759,7 +779,7 @@ public class MManager {
* expression in this method is formed by the amalgamation of seriesPath and the character '*'.
*
* @param prefixPath can be a prefix or a full path. if the wildcard is not at the tail, then each
- * wildcard can only match one level, otherwise it can match to the tail.
+ * wildcard can only match one level, otherwise it can match to the tail.
*/
public List<String> getAllTimeseriesName(String prefixPath) throws MetadataException {
lock.readLock().lock();
@@ -799,7 +819,7 @@ public class MManager {
* To calculate the count of nodes in the given level for given prefix path.
*
* @param prefixPath a prefix path or a full path, can not contain '*'
- * @param level the level can not be smaller than the level of the prefixPath
+ * @param level the level can not be smaller than the level of the prefixPath
*/
public int getNodesCountInGivenLevel(String prefixPath, int level) throws MetadataException {
lock.readLock().lock();
@@ -841,9 +861,11 @@ public class MManager {
// if ordered by heat, we sort all the timeseries by the descending order of the last insert timestamp
if (plan.isOrderByHeat()) {
- allMatchedNodes = allMatchedNodes.stream().sorted(
- Comparator.comparingLong(MTree::getLastTimeStamp).reversed()
- .thenComparing(MNode::getFullPath)).collect(toList());
+ QueryContext queryContext = new QueryContext(
+ QueryResourceManager.getInstance().assignQueryId(true));
+ allMatchedNodes = allMatchedNodes.stream().sorted(Comparator
+ .comparingLong((MeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, queryContext))
+ .reversed().thenComparing(MNode::getFullPath)).collect(toList());
} else {
// otherwise, we just sort them by the alphabetical order
allMatchedNodes = allMatchedNodes.stream().sorted(Comparator.comparing(MNode::getFullPath))
@@ -1188,7 +1210,7 @@ public class MManager {
* Check whether the given path contains a storage group change or set the new offset of a
* timeseries
*
- * @param path timeseries
+ * @param path timeseries
* @param offset offset in the tag file
*/
public void changeOffset(String path, long offset) throws MetadataException {
@@ -1218,10 +1240,10 @@ public class MManager {
* upsert tags and attributes key-value for the timeseries if the key has existed, just use the
* new value to update it.
*
- * @param alias newly added alias
- * @param tagsMap newly added tags map
+ * @param alias newly added alias
+ * @param tagsMap newly added tags map
* @param attributesMap newly added attributes map
- * @param fullPath timeseries
+ * @param fullPath timeseries
*/
public void upsertTagsAndAttributes(String alias, Map<String, String> tagsMap,
Map<String, String> attributesMap, String fullPath) throws MetadataException, IOException {
@@ -1324,7 +1346,7 @@ public class MManager {
* add new attributes key-value for the timeseries
*
* @param attributesMap newly added attributes map
- * @param fullPath timeseries
+ * @param fullPath timeseries
*/
public void addAttributes(Map<String, String> attributesMap, String fullPath)
throws MetadataException, IOException {
@@ -1366,7 +1388,7 @@ public class MManager {
/**
* add new tags key-value for the timeseries
*
- * @param tagsMap newly added tags map
+ * @param tagsMap newly added tags map
* @param fullPath timeseries
*/
public void addTags(Map<String, String> tagsMap, String fullPath)
@@ -1419,7 +1441,7 @@ public class MManager {
/**
* drop tags or attributes of the timeseries
*
- * @param keySet tags key or attributes key
+ * @param keySet tags key or attributes key
* @param fullPath timeseries path
*/
public void dropTagsOrAttributes(Set<String> keySet, String fullPath)
@@ -1568,8 +1590,8 @@ public class MManager {
/**
* rename the tag or attribute's key of the timeseries
*
- * @param oldKey old key of tag or attribute
- * @param newKey new key of tag or attribute
+ * @param oldKey old key of tag or attribute
+ * @param newKey new key of tag or attribute
* @param fullPath timeseries
*/
public void renameTagOrAttributeKey(String oldKey, String newKey, String fullPath)
@@ -1668,7 +1690,8 @@ public class MManager {
}
}
- public void collectTimeseriesSchema(MNode startingNode, Collection<TimeseriesSchema> timeseriesSchemas) {
+ public void collectTimeseriesSchema(MNode startingNode,
+ Collection<TimeseriesSchema> timeseriesSchemas) {
Deque<MNode> nodeDeque = new ArrayDeque<>();
nodeDeque.addLast(startingNode);
while (!nodeDeque.isEmpty()) {
@@ -1767,8 +1790,8 @@ public class MManager {
}
public void updateLastCache(String seriesPath, TimeValuePair timeValuePair,
- boolean highPriorityUpdate, Long latestFlushedTime,
- MeasurementMNode node) {
+ boolean highPriorityUpdate, Long latestFlushedTime,
+ MeasurementMNode node) {
if (node != null) {
node.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
} else {
@@ -1782,14 +1805,14 @@ public class MManager {
}
public TimeValuePair getLastCache(String seriesPath) {
- try {
- MeasurementMNode node = (MeasurementMNode) mtree.getNodeByPath(seriesPath);
- return node.getCachedLast();
- } catch (MetadataException e) {
- logger.warn("failed to get last cache for the {}, err:{}", seriesPath, e.getMessage());
+ try {
+ MeasurementMNode node = (MeasurementMNode) mtree.getNodeByPath(seriesPath);
+ return node.getCachedLast();
+ } catch (MetadataException e) {
+ logger.warn("failed to get last cache for the {}, err:{}", seriesPath, e.getMessage());
+ }
+ return null;
}
- return null;
-}
private void checkMTreeModified() {
if (logWriter == null || logFile == null) {
@@ -1844,8 +1867,8 @@ public class MManager {
}
/**
- * get schema for device.
- * Attention!!! Only support insertPlan
+ * get schema for device. Attention!!! Only support insertPlan
+ *
* @throws MetadataException
*/
public MeasurementSchema[] getSeriesSchemasAndReadLockDevice(String deviceId,
@@ -1864,7 +1887,8 @@ public class MManager {
// could not create it
if (!config.isAutoCreateSchemaEnabled()) {
throw new MetadataException(String.format(
- "Current deviceId[%s] does not contain measurement:%s", deviceId, measurementList[i]));
+ "Current deviceId[%s] does not contain measurement:%s", deviceId,
+ measurementList[i]));
}
// create it
@@ -1872,19 +1896,20 @@ public class MManager {
TSDataType dataType = getTypeInLoc(plan, i);
createTimeseries(
- path.getFullPath(),
- dataType,
- getDefaultEncoding(dataType),
- TSFileDescriptor.getInstance().getConfig().getCompressor(),
- Collections.emptyMap());
+ path.getFullPath(),
+ dataType,
+ getDefaultEncoding(dataType),
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
}
- MeasurementMNode measurementNode = (MeasurementMNode) getChild(deviceNode, measurementList[i]);
+ MeasurementMNode measurementNode = (MeasurementMNode) getChild(deviceNode,
+ measurementList[i]);
// check type is match
TSDataType insertDataType = null;
if (plan instanceof InsertRowPlan) {
- if (!((InsertRowPlan)plan).isNeedInferType()) {
+ if (!((InsertRowPlan) plan).isNeedInferType()) {
// only when InsertRowPlan's values is object[], we should check type
insertDataType = getTypeInLoc(plan, i);
} else {
@@ -1896,11 +1921,11 @@ public class MManager {
if (measurementNode.getSchema().getType() != insertDataType) {
logger.warn("DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
- measurementList[i], insertDataType, measurementNode.getSchema().getType());
+ measurementList[i], insertDataType, measurementNode.getSchema().getType());
if (!config.isEnablePartialInsert()) {
throw new MetadataException(String.format(
- "DataType mismatch, Insert measurement %s type %s, metadata tree type %s",
- measurementList[i], insertDataType, measurementNode.getSchema().getType()));
+ "DataType mismatch, Insert measurement %s type %s, metadata tree type %s",
+ measurementList[i], insertDataType, measurementNode.getSchema().getType()));
} else {
// mark failed measurement
plan.markFailedMeasurementInsertion(i);
@@ -1914,7 +1939,7 @@ public class MManager {
}
} catch (MetadataException e) {
logger.warn("meet error when check {}.{}, message: {}", deviceId, measurementList[i],
- e.getMessage());
+ e.getMessage());
if (config.isEnablePartialInsert()) {
// mark failed measurement
plan.markFailedMeasurementInsertion(i);
@@ -1949,32 +1974,34 @@ public class MManager {
return conf.getDefaultTextEncoding();
default:
throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", dataType.toString()));
+ String.format("Data type %s is not supported.", dataType.toString()));
}
}
/**
- * get dataType of plan, in loc measurements
- * only support InsertRowPlan and InsertTabletPlan
+ * get dataType of plan, in loc measurements only support InsertRowPlan and InsertTabletPlan
+ *
* @throws MetadataException
*/
private TSDataType getTypeInLoc(InsertPlan plan, int loc) throws MetadataException {
TSDataType dataType;
if (plan instanceof InsertRowPlan) {
InsertRowPlan tPlan = (InsertRowPlan) plan;
- dataType = TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
+ dataType = TypeInferenceUtils
+ .getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
} else if (plan instanceof InsertTabletPlan) {
dataType = (plan).getDataTypes()[loc];
- } else {
+ } else {
throw new MetadataException(String.format(
- "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
+ "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
}
return dataType;
}
/**
- * when insert, we lock device node for not create deleted time series
- * after insert, we should call this function to unlock the device node
+ * when insert, we lock device node for not create deleted time series after insert, we should
+ * call this function to unlock the device node
+ *
* @param deviceId
*/
public void unlockDeviceReadLock(String deviceId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 431c35b..2a7c144 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -66,6 +66,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -651,7 +652,10 @@ public class MTree implements Serializable {
}
List<String[]> allMatchedNodes = new ArrayList<>();
- findPath(root, nodes, 1, allMatchedNodes, false, true);
+ QueryContext queryContext = new QueryContext(
+ QueryResourceManager.getInstance().assignQueryId(true));
+
+ findPath(root, nodes, 1, allMatchedNodes, false, true, queryContext);
Stream<String[]> sortedStream = allMatchedNodes.stream().sorted(
Comparator.comparingLong((String[] s) -> Long.parseLong(s[7])).reversed()
@@ -683,10 +687,10 @@ public class MTree implements Serializable {
count.set(0);
if (offset.get() != 0 || limit.get() != 0) {
res = new LinkedList<>();
- findPath(root, nodes, 1, res, true, false);
+ findPath(root, nodes, 1, res, true, false, null);
} else {
res = new LinkedList<>();
- findPath(root, nodes, 1, res, false, false);
+ findPath(root, nodes, 1, res, false, false, null);
}
// avoid memory leaks
limit.remove();
@@ -704,7 +708,7 @@ public class MTree implements Serializable {
* dataType, encoding, compression, offset, lastTimeStamp]
*/
private void findPath(MNode node, String[] nodes, int idx, List<String[]> timeseriesSchemaList,
- boolean hasLimit, boolean needLast) throws MetadataException {
+ boolean hasLimit, boolean needLast, QueryContext queryContext) throws MetadataException {
if (node instanceof MeasurementMNode && nodes.length <= idx) {
if (hasLimit) {
curOffset.set(curOffset.get() + 1);
@@ -728,7 +732,8 @@ public class MTree implements Serializable {
tsRow[4] = measurementSchema.getEncodingType().toString();
tsRow[5] = measurementSchema.getCompressor().toString();
tsRow[6] = String.valueOf(((MeasurementMNode) node).getOffset());
- tsRow[7] = needLast ? String.valueOf(getLastTimeStamp((MeasurementMNode) node)) : null;
+ tsRow[7] =
+ needLast ? String.valueOf(getLastTimeStamp((MeasurementMNode) node, queryContext)) : null;
timeseriesSchemaList.add(tsRow);
if (hasLimit) {
@@ -738,14 +743,15 @@ public class MTree implements Serializable {
String nodeReg = MetaUtils.getNodeRegByIdx(idx, nodes);
if (!nodeReg.contains(PATH_WILDCARD)) {
if (node.hasChild(nodeReg)) {
- findPath(node.getChild(nodeReg), nodes, idx + 1, timeseriesSchemaList, hasLimit, needLast);
+ findPath(node.getChild(nodeReg), nodes, idx + 1, timeseriesSchemaList, hasLimit, needLast,
+ queryContext);
}
} else {
for (MNode child : node.getChildren().values()) {
if (!Pattern.matches(nodeReg.replace("*", ".*"), child.getName())) {
continue;
}
- findPath(child, nodes, idx + 1, timeseriesSchemaList, hasLimit, needLast);
+ findPath(child, nodes, idx + 1, timeseriesSchemaList, hasLimit, needLast, queryContext);
if (hasLimit) {
if (count.get().intValue() == limit.get().intValue()) {
return;
@@ -755,16 +761,18 @@ public class MTree implements Serializable {
}
}
- static long getLastTimeStamp(MeasurementMNode node) {
+ static long getLastTimeStamp(MeasurementMNode node, QueryContext queryContext) {
TimeValuePair last = node.getCachedLast();
if (last != null) {
return node.getCachedLast().getTimestamp();
} else {
try {
last = calculateLastPairForOneSeriesLocally(new Path(node.getFullPath()),
- node.getSchema().getType(), new QueryContext(-1), Collections.emptySet());
+ node.getSchema().getType(), queryContext, Collections.emptySet());
return last.getTimestamp();
} catch (Exception e) {
+ logger.error("Something wrong happened while trying to get last time value pair of {}",
+ node.getFullPath(), e);
return Long.MIN_VALUE;
}
}
@@ -885,8 +893,11 @@ public class MTree implements Serializable {
return getNodesList(path, nodeLevel, null);
}
- /** Get all paths from root to the given level */
- List<String> getNodesList(String path, int nodeLevel, StorageGroupFilter filter) throws MetadataException {
+ /**
+ * Get all paths from root to the given level
+ */
+ List<String> getNodesList(String path, int nodeLevel, StorageGroupFilter filter)
+ throws MetadataException {
String[] nodes = MetaUtils.getNodeNames(path);
if (!nodes[0].equals(root.getName())) {
throw new IllegalPathException(path);
@@ -896,7 +907,8 @@ public class MTree implements Serializable {
for (int i = 1; i < nodes.length; i++) {
if (node.getChild(nodes[i]) != null) {
node = node.getChild(nodes[i]);
- if (node instanceof StorageGroupMNode && filter != null && !filter.satisfy(node.getFullPath())) {
+ if (node instanceof StorageGroupMNode && filter != null && !filter
+ .satisfy(node.getFullPath())) {
return res;
}
} else {
@@ -914,7 +926,8 @@ public class MTree implements Serializable {
*/
private void findNodes(MNode node, String path, List<String> res, int targetLevel,
StorageGroupFilter filter) {
- if (node == null || node instanceof StorageGroupMNode && filter != null && !filter.satisfy(node.getFullPath())) {
+ if (node == null || node instanceof StorageGroupMNode && filter != null && !filter
+ .satisfy(node.getFullPath())) {
return;
}
if (targetLevel == 0) {
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index d75de25..128eba5 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.writelog.recover;
import static org.apache.iotdb.db.engine.flush.MemTableFlushTask.getFlushLogFile;
-import static org.apache.iotdb.db.engine.flush.VmLogger.isVMLoggerFileExist;
import static org.apache.iotdb.db.engine.storagegroup.TsFileProcessor.createNewVMFile;
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SUFFIX;
@@ -34,6 +33,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -64,7 +64,7 @@ public class TsFileRecoverPerformer {
private String filePath;
private String logNodePrefix;
private VersionController versionController;
- private TsFileResource resource;
+ private TsFileResource tsFileResource;
private boolean sequence;
private boolean isLastFile;
@@ -80,7 +80,7 @@ public class TsFileRecoverPerformer {
this.filePath = currentTsFileResource.getTsFilePath();
this.logNodePrefix = logNodePrefix;
this.versionController = versionController;
- this.resource = currentTsFileResource;
+ this.tsFileResource = currentTsFileResource;
this.sequence = sequence;
this.isLastFile = isLastFile;
this.vmTsFileResources = vmTsFileResources;
@@ -110,16 +110,11 @@ public class TsFileRecoverPerformer {
logger.error("TsFile {} is missing, will skip its recovery.", filePath);
return null;
}
- for (List<File> subVmFileList : vmFileList) {
- for (File vmFile : subVmFileList) {
- if (!vmFile.exists()) {
- logger.error("VMFile {} is missing, will skip its recovery.", vmFile.getPath());
- return null;
- }
- }
- }
+
// remove corrupted part of the TsFile
RestorableTsFileIOWriter restorableTsFileIOWriter;
+
+ // create RestorableTsFileIOWriter for all vmfiles in each level
List<List<RestorableTsFileIOWriter>> vmRestorableTsFileIOWriterList = new ArrayList<>();
try {
restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
@@ -127,7 +122,10 @@ public class TsFileRecoverPerformer {
vmRestorableTsFileIOWriterList.add(new ArrayList<>());
for (int j = 0; j < vmTsFileResources.get(i).size(); j++) {
file = vmFileList.get(i).get(j);
- vmRestorableTsFileIOWriterList.get(i).add(new RestorableTsFileIOWriter(file));
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(file);
+ // recover vmfiles' metadata
+ recoverResourceFromWriter(writer, vmTsFileResources.get(i).get(j));
+ vmRestorableTsFileIOWriterList.get(i).add(writer);
}
}
} catch (NotCompatibleTsFileException e) {
@@ -138,21 +136,11 @@ public class TsFileRecoverPerformer {
throw new StorageGroupProcessorException(e);
}
- RestorableTsFileIOWriter lastRestorableTsFileIOWriter =
- vmTsFileResources.isEmpty() ? restorableTsFileIOWriter
- : vmRestorableTsFileIOWriterList.get(0)
- .get(vmRestorableTsFileIOWriterList.get(0).size() - 1);
-
- TsFileResource lastTsFileResource = vmTsFileResources.isEmpty() ? resource
- : vmTsFileResources.get(0).get(vmTsFileResources.get(0).size() - 1);
-
- boolean isComplete =
- !lastRestorableTsFileIOWriter.hasCrashed() && !lastRestorableTsFileIOWriter.canWrite();
-
- if (isComplete) {
- // tsfile is complete, vmfile is never complete because it's canWrite() always return true.
+ // judge whether tsfile is complete
+ if (!restorableTsFileIOWriter.hasCrashed()) {
+ // if tsfile is complete, then there is no vmfiles.
try {
- recoverResource(resource);
+ recoverResource(tsFileResource);
return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
} catch (IOException e) {
throw new StorageGroupProcessorException(
@@ -160,61 +148,67 @@ public class TsFileRecoverPerformer {
+ RESOURCE_SUFFIX + e);
}
} else {
- if (!vmTsFileResources.isEmpty()) {
- for (int i = 0; i < vmTsFileResources.size(); i++) {
- for (int j = 0; j < vmTsFileResources.get(i).size(); j++) {
- recoverResourceFromWriter(vmRestorableTsFileIOWriterList.get(i).get(j),
- vmTsFileResources.get(i).get(j));
- }
- }
- recoverResourceFromWriter(restorableTsFileIOWriter, resource);
- boolean vmFileNotCrashed = !getFlushLogFile(restorableTsFileIOWriter).exists();
- // if the last file in vmTsFileResources is not crashed
- if (vmFileNotCrashed) {
- try {
- boolean tsFileNotCrashed = !isVMLoggerFileExist(restorableTsFileIOWriter);
- // tsfile is not crash
- if (tsFileNotCrashed) {
+ // tsfile has crashed
+ // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
+ // map must be updated first to avoid duplicated insertion
+ recoverResourceFromWriter(restorableTsFileIOWriter, tsFileResource);
+ }
+
+ // If the vm is not enable, the walTargetWriter points to the tsfile.
+ // If the vm is enable and flush log exists, the walTargetWriter points to the vm of the flush log
+ // if the vm is enable and flush log does not exist, the walTargetWriter is null.
+ RestorableTsFileIOWriter walTargetWriter = null;
+ TsFileResource walTargetResource = null;
- // if wal exists, we should open a new vmfile to replay it
- File newVmFile = createNewVMFile(resource, 0);
- TsFileResource newVmTsFileResource = new TsFileResource(newVmFile);
- RestorableTsFileIOWriter newVMWriter = new RestorableTsFileIOWriter(newVmFile);
- if (redoLogs(newVMWriter, newVmTsFileResource)) {
- vmTsFileResources.get(0).add(newVmTsFileResource);
- vmRestorableTsFileIOWriterList.get(0).add(newVMWriter);
- } else {
- Files.delete(newVmFile.toPath());
- }
- } else {
- IMemTable recoverMemTable = new PrimitiveMemTable();
- recoverMemTable.setVersion(versionController.nextVersion());
- LogReplayer logReplayer = new LogReplayer(logNodePrefix, filePath,
- resource.getModFile(),
- versionController, resource, recoverMemTable, sequence);
- logReplayer.replayLogs();
- }
- // clean logs
- MultiFileLogNodeManager.getInstance().deleteNode(
- logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
- updateTsFileResource();
- return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
- } catch (IOException e) {
- throw new StorageGroupProcessorException(
- "recover the resource file failed: " + filePath
- + RESOURCE_SUFFIX + e);
- }
+ boolean enableVM = IoTDBDescriptor.getInstance().getConfig().isEnableVm();
+ boolean flushLogExist = getFlushLogFile(restorableTsFileIOWriter).exists();
+
+ if (!enableVM) {
+ walTargetWriter = restorableTsFileIOWriter;
+ walTargetResource = tsFileResource;
+ } else if (flushLogExist) {
+ walTargetWriter = vmRestorableTsFileIOWriterList.get(0)
+ .get(vmRestorableTsFileIOWriterList.get(0).size() - 1);
+ walTargetResource = vmTsFileResources.get(0).get(vmTsFileResources.get(0).size() - 1);
+ // prevent generating resource file
+ isLastFile = true;
+ }
+
+
+ if (walTargetWriter == null) {
+ try {
+ // if wal exists, we should open a new vmfile to replay it
+ File newVmFile = createNewVMFile(tsFileResource, 0);
+ TsFileResource newVmTsFileResource = new TsFileResource(newVmFile);
+ RestorableTsFileIOWriter newVMWriter = new RestorableTsFileIOWriter(newVmFile);
+ isLastFile = true;
+ // prevent generating resource file
+ if (redoLogs(newVMWriter, newVmTsFileResource, restorableTsFileIOWriter)) {
+ // close output stream of newVMWriter
+ newVMWriter.close();
+ // recover metadata for new vmfile
+ recoverResourceFromWriter(newVMWriter, newVmTsFileResource);
+ vmTsFileResources.get(0).add(newVmTsFileResource);
+ vmRestorableTsFileIOWriterList.get(0).add(newVMWriter);
+ // clean logs
+ MultiFileLogNodeManager.getInstance().deleteNode(
+ logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
+ } else {
+ newVMWriter.close();
+ Files.delete(newVmFile.toPath());
}
- } else {
- // tsfile has crashed
- // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
- // map must be updated first to avoid duplicated insertion
- recoverResourceFromWriter(lastRestorableTsFileIOWriter, lastTsFileResource);
+ updateTsFileResource();
+ return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
+ } catch (IOException e) {
+ logger.error("recover the resource file failed: {}", filePath + RESOURCE_SUFFIX, e);
+ throw new StorageGroupProcessorException(
+ "recover the resource file failed: " + filePath
+ + RESOURCE_SUFFIX + e);
}
}
// redo logs
- redoLogs(lastRestorableTsFileIOWriter, lastTsFileResource);
+ redoLogs(walTargetWriter, walTargetResource, restorableTsFileIOWriter);
// clean logs
try {
@@ -229,12 +223,12 @@ public class TsFileRecoverPerformer {
private void updateTsFileResource() {
for (List<TsFileResource> subTsFileResources : vmTsFileResources) {
- for (TsFileResource tsFileResource : subTsFileResources) {
- for (Entry<String, Integer> entry : tsFileResource.getDeviceToIndexMap().entrySet()) {
+ for (TsFileResource resource : subTsFileResources) {
+ for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) {
String device = entry.getKey();
int index = entry.getValue();
- resource.updateStartTime(device, tsFileResource.getStartTime(index));
- resource.updateEndTime(device, tsFileResource.getEndTime(index));
+ tsFileResource.updateStartTime(device, resource.getStartTime(index));
+ tsFileResource.updateEndTime(device, resource.getEndTime(index));
}
}
}
@@ -306,7 +300,7 @@ public class TsFileRecoverPerformer {
}
private boolean redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter,
- TsFileResource tsFileResource) throws StorageGroupProcessorException {
+ TsFileResource tsFileResource, RestorableTsFileIOWriter tsFileIOWriter) throws StorageGroupProcessorException {
IMemTable recoverMemTable = new PrimitiveMemTable();
recoverMemTable.setVersion(versionController.nextVersion());
LogReplayer logReplayer = new LogReplayer(logNodePrefix, filePath, tsFileResource.getModFile(),
@@ -318,7 +312,7 @@ public class TsFileRecoverPerformer {
// flush logs
MemTableFlushTask tableFlushTask = new MemTableFlushTask(recoverMemTable,
restorableTsFileIOWriter,
- tsFileResource.getTsFile().getParentFile().getParentFile().getName());
+ tsFileResource.getTsFile().getParentFile().getParentFile().getName(), tsFileIOWriter);
tableFlushTask.syncFlushMemTable();
res = true;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
index 31130d4..f39ea74 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
@@ -70,7 +69,7 @@ public class MemTableFlushTaskTest {
MemTableTestUtils.produceData(memTable, startTime, endTime, MemTableTestUtils.deviceId0,
MemTableTestUtils.measurementId0,
MemTableTestUtils.dataType0);
- MemTableFlushTask memTableFlushTask = new MemTableFlushTask(memTable, writer, storageGroup);
+ MemTableFlushTask memTableFlushTask = new MemTableFlushTask(memTable, writer, storageGroup, writer);
assertTrue(writer
.getVisibleMetadataList(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0,
MemTableTestUtils.dataType0).isEmpty());
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
index d64bcf7..fad1beb 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
@@ -21,10 +21,10 @@ package org.apache.iotdb.db.writelog.recover;
import static org.junit.Assert.assertEquals;
+import java.util.*;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.Collections;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.constant.TestConstant;
@@ -180,16 +180,16 @@ public class RecoverResourceFromReaderTest {
ReadWriteIOUtils.write(123, outputStream);
}
- TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix,
- versionController, resource, false, false, Collections.emptyList());
+ TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController,
+ resource, false, false, Collections.singletonList(new ArrayList<>()));
ActiveTimeSeriesCounter.getInstance()
.init(resource.getTsFile().getParentFile().getParentFile().getName());
- performer.recover();
- assertEquals(1, (long) resource.getStartTime("root.sg.device99"));
- assertEquals(300, (long) resource.getEndTime("root.sg.device99"));
+ performer.recover().left.close();
+ assertEquals(1, resource.getStartTime("root.sg.device99"));
+ assertEquals(300, resource.getEndTime("root.sg.device99"));
for (int i = 0; i < 10; i++) {
- assertEquals(0, (long) resource.getStartTime("root.sg.device" + i));
- assertEquals(9, (long) resource.getEndTime("root.sg.device" + i));
+ assertEquals(0, resource.getStartTime("root.sg.device" + i));
+ assertEquals(9, resource.getEndTime("root.sg.device" + i));
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index 4c4b372..d6b4fef 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -19,7 +19,19 @@
package org.apache.iotdb.db.writelog.recover;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -55,12 +67,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-
-import static org.junit.Assert.*;
-
public class SeqTsFileRecoverTest {
private File tsF;
@@ -87,6 +93,7 @@ public class SeqTsFileRecoverTest {
@Before
public void setup() throws IOException, WriteProcessException, MetadataException {
EnvironmentUtils.envSetUp();
+ IoTDBDescriptor.getInstance().getConfig().setEnableVm(false);
tsF = SystemFileFactory.INSTANCE.getFile(logNodePrefix, "1-1-1.tsfile");
tsF.getParentFile().mkdirs();
@@ -157,6 +164,7 @@ public class SeqTsFileRecoverTest {
@After
public void tearDown() throws IOException, StorageEngineException {
EnvironmentUtils.cleanEnv();
+ IoTDBDescriptor.getInstance().getConfig().setEnableVm(true);
FileUtils.deleteDirectory(tsF.getParentFile());
resource.close();
node.delete();
@@ -164,17 +172,18 @@ public class SeqTsFileRecoverTest {
@Test
public void testNonLastRecovery() throws StorageGroupProcessorException, IOException {
- TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix,
- versionController, resource, false, false, Collections.emptyList());
+ TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController,
+ resource, false, false, Collections.singletonList(new ArrayList<>()));
ActiveTimeSeriesCounter.getInstance().init(storageGroup);
RestorableTsFileIOWriter writer = performer.recover().left;
assertFalse(writer.canWrite());
+ writer.close();
- assertEquals(2, (long) resource.getStartTime("root.sg.device99"));
- assertEquals(100, (long) resource.getEndTime("root.sg.device99"));
+ assertEquals(2, resource.getStartTime("root.sg.device99"));
+ assertEquals(100, resource.getEndTime("root.sg.device99"));
for (int i = 0; i < 10; i++) {
- assertEquals(0, (long) resource.getStartTime("root.sg.device" + i));
- assertEquals(19, (long) resource.getEndTime("root.sg.device" + i));
+ assertEquals(0, resource.getStartTime("root.sg.device" + i));
+ assertEquals(19, resource.getEndTime("root.sg.device" + i));
}
ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new TsFileSequenceReader(tsF.getPath()));
@@ -213,8 +222,8 @@ public class SeqTsFileRecoverTest {
@Test
public void testLastRecovery() throws StorageGroupProcessorException, IOException {
- TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix,
- versionController, resource, false, true, Collections.emptyList());
+ TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController,
+ resource, false, true, Collections.singletonList(new ArrayList<>()));
ActiveTimeSeriesCounter.getInstance().init(storageGroup);
RestorableTsFileIOWriter writer = performer.recover().left;
@@ -224,11 +233,11 @@ public class SeqTsFileRecoverTest {
assertTrue(writer.canWrite());
writer.endFile();
- assertEquals(2, (long) resource.getStartTime("root.sg.device99"));
- assertEquals(100, (long) resource.getEndTime("root.sg.device99"));
+ assertEquals(2, resource.getStartTime("root.sg.device99"));
+ assertEquals(100, resource.getEndTime("root.sg.device99"));
for (int i = 0; i < 10; i++) {
- assertEquals(0, (long) resource.getStartTime("root.sg.device" + i));
- assertEquals(19, (long) resource.getEndTime("root.sg.device" + i));
+ assertEquals(0, resource.getStartTime("root.sg.device" + i));
+ assertEquals(19, resource.getEndTime("root.sg.device" + i));
}
ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new TsFileSequenceReader(tsF.getPath()));
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 2a03f83..13717e1 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -19,7 +19,13 @@
package org.apache.iotdb.db.writelog.recover;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -58,12 +64,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-
public class UnseqTsFileRecoverTest {
private File tsF;
@@ -89,6 +89,7 @@ public class UnseqTsFileRecoverTest {
@Before
public void setup() throws IOException, WriteProcessException, MetadataException {
EnvironmentUtils.envSetUp();
+ IoTDBDescriptor.getInstance().getConfig().setEnableVm(false);
tsF = SystemFileFactory.INSTANCE.getFile(logNodePrefix, "1-1-1.tsfile");
tsF.getParentFile().mkdirs();
@@ -173,6 +174,7 @@ public class UnseqTsFileRecoverTest {
FileUtils.deleteDirectory(tsF.getParentFile());
resource.close();
node.delete();
+ IoTDBDescriptor.getInstance().getConfig().setEnableVm(true);
EnvironmentUtils.cleanEnv();
}
@@ -182,7 +184,7 @@ public class UnseqTsFileRecoverTest {
versionController, resource, false, false, Collections.emptyList());
ActiveTimeSeriesCounter.getInstance()
.init(resource.getTsFile().getParentFile().getParentFile().getName());
- performer.recover();
+ performer.recover().left.close();
assertEquals(1, (long) resource.getStartTime("root.sg.device99"));
assertEquals(300, (long) resource.getEndTime("root.sg.device99"));
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 0cf8972..e72230a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -851,7 +851,7 @@ public class TsFileSequenceReader implements AutoCloseable {
}
boolean newChunkGroup = true;
// not a complete file, we will recover it...
- long truncatedPosition = TSFileConfig.MAGIC_STRING.getBytes().length;
+ long truncatedPosition = headerLength;
byte marker;
int chunkCnt = 0;
List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index ef1b915..3586f93 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -83,6 +83,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
// file doesn't exist
if (file.length() == 0) {
startFile();
+ crashed = true;
+ canWrite = true;
return;
}
@@ -106,11 +108,13 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
String.format("%s is not in TsFile format.", file.getAbsolutePath()));
} else if (truncatedPosition == TsFileCheckStatus.ONLY_MAGIC_HEAD) {
crashed = true;
+ canWrite = true;
out.truncate(
(long) TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
.getBytes().length);
} else {
crashed = true;
+ canWrite = true;
// remove broken data
out.truncate(truncatedPosition);
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
index 255ab84..98def4c 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
@@ -100,7 +100,8 @@ public class RestorableTsFileIOWriterTest {
RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
- assertEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedPosition());
+ assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
+ rWriter.getTruncatedPosition());
assertTrue(file.delete());
}
@@ -113,7 +114,8 @@ public class RestorableTsFileIOWriterTest {
RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
TsFileWriter writer = new TsFileWriter(rWriter);
writer.close();
- assertEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedPosition());
+ assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
+ rWriter.getTruncatedPosition());
assertTrue(file.delete());
}
@@ -130,7 +132,8 @@ public class RestorableTsFileIOWriterTest {
RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
- assertEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedPosition());
+ assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
+ rWriter.getTruncatedPosition());
assertTrue(file.delete());
}