You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/01/22 01:38:34 UTC
[incubator-iotdb] branch delete_dev4 updated: add roll back
mechanism
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch delete_dev4
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/delete_dev4 by this push:
new 3f5bbb0 add roll back mechanism
3f5bbb0 is described below
commit 3f5bbb095df0ce0b1f45f52bfe2ec14f598f0596
Author: 江天 <jt...@163.com>
AuthorDate: Tue Jan 22 09:37:53 2019 +0800
add roll back mechanism
---
.../db/engine/filenode/FileNodeProcessor.java | 488 +++++++++++----------
.../db/engine/modification/ModificationFile.java | 17 +-
.../io/LocalTextModificationAccessor.java | 17 +-
.../engine/modification/io/ModificationWriter.java | 5 +
.../db/engine/overflow/ioV2/OverflowProcessor.java | 13 +-
.../db/engine/overflow/ioV2/OverflowResource.java | 10 +-
.../engine/modification/ModificationFileTest.java | 36 ++
7 files changed, 336 insertions(+), 250 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index f810273..1ac6e42 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -1,6 +1,6 @@
/**
* Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
- *
+ * <p>
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,7 +26,6 @@ import java.nio.file.Files;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -74,7 +73,6 @@ import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
import org.apache.iotdb.db.query.reader.IReader;
-import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
@@ -118,7 +116,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
private final HashMap<String, AtomicLong> statParamsHashMap = new HashMap<String, AtomicLong>() {
{
for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
- MonitorConstants.FileNodeProcessorStatConstants.values()) {
+ MonitorConstants.FileNodeProcessorStatConstants.values()) {
put(statConstant.name(), new AtomicLong(0));
}
}
@@ -217,16 +215,16 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* constructor of FileNodeProcessor.
*/
public FileNodeProcessor(String fileNodeDirPath, String processorName)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
super(processorName);
statStorageDeltaName =
- MonitorConstants.statStorageGroupPrefix + MonitorConstants.MONITOR_PATH_SEPERATOR
- + MonitorConstants.fileNodePath + MonitorConstants.MONITOR_PATH_SEPERATOR
- + processorName.replaceAll("\\.", "_");
+ MonitorConstants.statStorageGroupPrefix + MonitorConstants.MONITOR_PATH_SEPERATOR
+ + MonitorConstants.fileNodePath + MonitorConstants.MONITOR_PATH_SEPERATOR
+ + processorName.replaceAll("\\.", "_");
this.parameters = new HashMap<>();
if (fileNodeDirPath.length() > 0
- && fileNodeDirPath.charAt(fileNodeDirPath.length() - 1) != File.separatorChar) {
+ && fileNodeDirPath.charAt(fileNodeDirPath.length() - 1) != File.separatorChar) {
fileNodeDirPath = fileNodeDirPath + File.separatorChar;
}
this.baseDirPath = fileNodeDirPath + processorName;
@@ -234,16 +232,16 @@ public class FileNodeProcessor extends Processor implements IStatistic {
if (!dataDir.exists()) {
dataDir.mkdirs();
LOGGER.info(
- "The data directory of the filenode processor {} doesn't exist. Create new directory {}",
- getProcessorName(), baseDirPath);
+ "The data directory of the filenode processor {} doesn't exist. Create new directory {}",
+ getProcessorName(), baseDirPath);
}
fileNodeRestoreFilePath = new File(dataDir, processorName + RESTORE_FILE_SUFFIX).getPath();
try {
fileNodeProcessorStore = readStoreFromDisk();
} catch (FileNodeProcessorException e) {
LOGGER.error(
- "The fileNode processor {} encountered an error when recoverying restore information.",
- processorName, e);
+ "The fileNode processor {} encountered an error when recoverying restore information.",
+ processorName, e);
throw new FileNodeProcessorException(e);
}
// TODO deep clone the lastupdate time
@@ -266,7 +264,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
// status is not NONE, or the last intervalFile is not closed
if (isMerging != FileNodeProcessorStatus.NONE
- || (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 1).isClosed())) {
+ || (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 1).isClosed())) {
shouldRecovery = true;
} else {
// add file into the index of file
@@ -294,9 +292,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
HashMap<String, String> hashMap = new HashMap<String, String>() {
{
for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
- MonitorConstants.FileNodeProcessorStatConstants.values()) {
+ MonitorConstants.FileNodeProcessorStatConstants.values()) {
put(statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name(),
- MonitorConstants.DataType);
+ MonitorConstants.DataType);
}
}
};
@@ -307,9 +305,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
public List<String> getAllPathForStatistic() {
List<String> list = new ArrayList<>();
for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
- MonitorConstants.FileNodeProcessorStatConstants.values()) {
+ MonitorConstants.FileNodeProcessorStatConstants.values()) {
list.add(
- statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name());
+ statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name());
}
return list;
}
@@ -348,10 +346,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* add interval FileNode.
*/
public void addIntervalFileNode(long startTime, String baseDir, String fileName)
- throws Exception {
+ throws Exception {
IntervalFileNode intervalFileNode = new IntervalFileNode(OverflowChangeType.NO_CHANGE, baseDir,
- fileName);
+ fileName);
this.currentIntervalFileNode = intervalFileNode;
newFileNodes.add(intervalFileNode);
fileNodeProcessorStore.setNewFileNodes(newFileNodes);
@@ -446,29 +444,29 @@ public class FileNodeProcessor extends Processor implements IStatistic {
parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bufferwriteFlushAction);
parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bufferwriteCloseAction);
parameters
- .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+ .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
String baseDir = directories
- .getTsFileFolder(newFileNodes.get(newFileNodes.size() - 1).getBaseDirIndex());
+ .getTsFileFolder(newFileNodes.get(newFileNodes.size() - 1).getBaseDirIndex());
LOGGER.info(
- "The filenode processor {} will recovery the bufferwrite processor, "
- + "the bufferwrite file is {}",
- getProcessorName(), fileNames[fileNames.length - 1]);
+ "The filenode processor {} will recovery the bufferwrite processor, "
+ + "the bufferwrite file is {}",
+ getProcessorName(), fileNames[fileNames.length - 1]);
try {
bufferWriteProcessor = new BufferWriteProcessor(baseDir, getProcessorName(),
- fileNames[fileNames.length - 1], parameters, versionController, fileSchema);
+ fileNames[fileNames.length - 1], parameters, versionController, fileSchema);
} catch (BufferWriteProcessorException e) {
// unlock
writeUnlock();
LOGGER.error(
- "The filenode processor {} failed to recovery the bufferwrite processor, "
- + "the last bufferwrite file is {}.",
- getProcessorName(), fileNames[fileNames.length - 1]);
+ "The filenode processor {} failed to recovery the bufferwrite processor, "
+ + "the last bufferwrite file is {}.",
+ getProcessorName(), fileNames[fileNames.length - 1]);
throw new FileNodeProcessorException(e);
}
}
// restore the overflow processor
LOGGER.info("The filenode processor {} will recovery the overflow processor.",
- getProcessorName());
+ getProcessorName());
parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
try {
@@ -477,7 +475,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
} catch (IOException e) {
writeUnlock();
LOGGER.error("The filenode processor {} failed to recovery the overflow processor.",
- getProcessorName());
+ getProcessorName());
throw new FileNodeProcessorException(e);
}
@@ -487,14 +485,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// re-merge all file
// if bufferwrite processor is not null, and close
LOGGER.info("The filenode processor {} is recovering, the filenode status is {}.",
- getProcessorName(),
- isMerging);
+ getProcessorName(),
+ isMerging);
merge();
} else if (isMerging == FileNodeProcessorStatus.WAITING) {
// unlock
LOGGER.info("The filenode processor {} is recovering, the filenode status is {}.",
- getProcessorName(),
- isMerging);
+ getProcessorName(),
+ isMerging);
writeUnlock();
switchWaitingToWorkingv2(newFileNodes);
} else {
@@ -508,23 +506,23 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* get buffer write processor by processor name and insert time.
*/
public BufferWriteProcessor getBufferWriteProcessor(String processorName, long insertTime)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
if (bufferWriteProcessor == null) {
Map<String, Action> parameters = new HashMap<>();
parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bufferwriteFlushAction);
parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bufferwriteCloseAction);
parameters
- .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+ .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
String baseDir = directories.getNextFolderForTsfile();
LOGGER.info("Allocate folder {} for the new bufferwrite processor.", baseDir);
// construct processor or restore
try {
bufferWriteProcessor = new BufferWriteProcessor(baseDir, processorName,
- insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis(),
- parameters, versionController, fileSchema);
+ insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis(),
+ parameters, versionController, fileSchema);
} catch (BufferWriteProcessorException e) {
LOGGER.error("The filenode processor {} failed to get the bufferwrite processor.",
- processorName, e);
+ processorName, e);
throw new FileNodeProcessorException(e);
}
}
@@ -551,7 +549,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// construct processor or restore
parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
parameters
- .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+ .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
overflowProcessor = new OverflowProcessor(processorName, parameters, fileSchema,
versionController);
}
@@ -623,9 +621,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
public void changeTypeToChanged(String deviceId, long timestamp) {
if (!invertedindexOfFiles.containsKey(deviceId)) {
LOGGER.warn(
- "Can not find any tsfile which will be overflowed in the filenode processor {}, "
- + "the data is [device:{},time:{}]",
- getProcessorName(), deviceId, timestamp);
+ "Can not find any tsfile which will be overflowed in the filenode processor {}, "
+ + "the data is [device:{},time:{}]",
+ getProcessorName(), deviceId, timestamp);
emptyIntervalFileNode.setStartTime(deviceId, 0L);
emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
emptyIntervalFileNode.changeTypeToChanged(isMerging);
@@ -645,9 +643,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
public void changeTypeToChanged(String deviceId, long startTime, long endTime) {
if (!invertedindexOfFiles.containsKey(deviceId)) {
LOGGER.warn(
- "Can not find any tsfile which will be overflowed in the filenode processor {}, "
- + "the data is [device:{}, start time:{}, end time:{}]",
- getProcessorName(), deviceId, startTime, endTime);
+ "Can not find any tsfile which will be overflowed in the filenode processor {}, "
+ + "the data is [device:{}, start time:{}, end time:{}]",
+ getProcessorName(), deviceId, startTime, endTime);
emptyIntervalFileNode.setStartTime(deviceId, 0L);
emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
emptyIntervalFileNode.changeTypeToChanged(isMerging);
@@ -670,9 +668,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
public void changeTypeToChangedForDelete(String deviceId, long timestamp) {
if (!invertedindexOfFiles.containsKey(deviceId)) {
LOGGER.warn(
- "Can not find any tsfile which will be overflowed in the filenode processor {}, "
- + "the data is [device:{}, delete time:{}]",
- getProcessorName(), deviceId, timestamp);
+ "Can not find any tsfile which will be overflowed in the filenode processor {}, "
+ + "the data is [device:{}, delete time:{}]",
+ getProcessorName(), deviceId, timestamp);
emptyIntervalFileNode.setStartTime(deviceId, 0L);
emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
emptyIntervalFileNode.changeTypeToChanged(isMerging);
@@ -694,7 +692,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* @return index of interval
*/
private int searchIndexNodeByTimestamp(String deviceId, long timestamp,
- List<IntervalFileNode> fileList) {
+ List<IntervalFileNode> fileList) {
int index = 1;
while (index < fileList.size()) {
if (timestamp < fileList.get(index).getStartTime(deviceId)) {
@@ -728,19 +726,19 @@ public class FileNodeProcessor extends Processor implements IStatistic {
newMultiPassLock.readLock().unlock();
newMultiPassTokenSet.remove(token);
LOGGER
- .debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token, getProcessorName(),
- newMultiPassTokenSet, newMultiPassLock);
+ .debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token, getProcessorName(),
+ newMultiPassTokenSet, newMultiPassLock);
return true;
} else if (oldMultiPassTokenSet != null && oldMultiPassTokenSet.contains(token)) {
// remove token first, then unlock
oldMultiPassLock.readLock().unlock();
oldMultiPassTokenSet.remove(token);
LOGGER.debug("Remove multi token:{}, old set:{}, lock:{}", token, oldMultiPassTokenSet,
- oldMultiPassLock);
+ oldMultiPassLock);
return true;
} else {
LOGGER.error("remove token error:{},new set:{}, old set:{}", token, newMultiPassTokenSet,
- oldMultiPassTokenSet);
+ oldMultiPassTokenSet);
// should add throw exception
return false;
}
@@ -750,8 +748,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* query data.
*/
public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId,
- Filter filter)
- throws FileNodeProcessorException {
+ Filter filter)
+ throws FileNodeProcessorException {
// query overflow data
TSDataType dataType = null;
try {
@@ -775,35 +773,35 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
}
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> bufferwritedata
- = new Pair<ReadOnlyMemChunk, List<ChunkMetaData>>(null, null);
+ = new Pair<ReadOnlyMemChunk, List<ChunkMetaData>>(null, null);
// bufferwrite data
UnsealedTsFile unsealedTsFile = null;
if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 1).isClosed()
- && !newFileNodes.get(newFileNodes.size() - 1).getStartTimeMap().isEmpty()) {
+ && !newFileNodes.get(newFileNodes.size() - 1).getStartTimeMap().isEmpty()) {
unsealedTsFile = new UnsealedTsFile();
unsealedTsFile.setFilePath(newFileNodes.get(newFileNodes.size() - 1).getFilePath());
if (bufferWriteProcessor == null) {
LOGGER.error(
- "The last of tsfile {} in filenode processor {} is not closed, "
- + "but the bufferwrite processor is null.",
- newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), getProcessorName());
+ "The last of tsfile {} in filenode processor {} is not closed, "
+ + "but the bufferwrite processor is null.",
+ newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), getProcessorName());
throw new FileNodeProcessorException(String.format(
- "The last of tsfile %s in filenode processor %s is not closed, "
- + "but the bufferwrite processor is null.",
- newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), getProcessorName()));
+ "The last of tsfile %s in filenode processor %s is not closed, "
+ + "but the bufferwrite processor is null.",
+ newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), getProcessorName()));
}
bufferwritedata = bufferWriteProcessor
- .queryBufferWriteData(deviceId, measurementId, dataType);
+ .queryBufferWriteData(deviceId, measurementId, dataType);
try {
- List<Modification> pathModifications = QueryUtils.getPathModifications(
- currentIntervalFileNode.getModFile(), deviceId
- + IoTDBConstant.PATH_SEPARATOR + measurementId
- );
- if (pathModifications.size() > 0) {
- QueryUtils.modifyChunkMetaData(bufferwritedata.right, pathModifications);
- }
+ List<Modification> pathModifications = QueryUtils.getPathModifications(
+ currentIntervalFileNode.getModFile(), deviceId
+ + IoTDBConstant.PATH_SEPARATOR + measurementId
+ );
+ if (pathModifications.size() > 0) {
+ QueryUtils.modifyChunkMetaData(bufferwritedata.right, pathModifications);
+ }
} catch (IOException e) {
throw new FileNodeProcessorException(e);
}
@@ -811,8 +809,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
unsealedTsFile.setTimeSeriesChunkMetaDatas(bufferwritedata.right);
}
GlobalSortedSeriesDataSource globalSortedSeriesDataSource = new GlobalSortedSeriesDataSource(
- new Path(deviceId + "." + measurementId), bufferwriteDataInFiles, unsealedTsFile,
- bufferwritedata.left);
+ new Path(deviceId + "." + measurementId), bufferwriteDataInFiles, unsealedTsFile,
+ bufferwritedata.left);
return new QueryDataSource(globalSortedSeriesDataSource, overflowSeriesDataSource);
}
@@ -820,11 +818,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
/**
* append one specified tsfile to this filenode processor.
*
- * @param appendFile the appended tsfile information
+ * @param appendFile the appended tsfile information
* @param appendFilePath the seriesPath of appended file
*/
public void appendFile(IntervalFileNode appendFile, String appendFilePath)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
try {
if (!new File(appendFile.getFilePath()).getParentFile().exists()) {
new File(appendFile.getFilePath()).getParentFile().mkdirs();
@@ -834,11 +832,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
File targetFile = new File(appendFile.getFilePath());
if (!originFile.exists()) {
throw new FileNodeProcessorException(
- String.format("The appended file %s does not exist.", appendFilePath));
+ String.format("The appended file %s does not exist.", appendFilePath));
}
if (targetFile.exists()) {
throw new FileNodeProcessorException(
- String.format("The appended target file %s already exists.", appendFile.getFilePath()));
+ String.format("The appended target file %s already exists.", appendFile.getFilePath()));
}
originFile.renameTo(targetFile);
// append the new tsfile
@@ -854,7 +852,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
addAllFileIntoIndex(newFileNodes);
} catch (Exception e) {
LOGGER.error("Failed to append the tsfile {} to filenode processor {}.", appendFile,
- getProcessorName(), e);
+ getProcessorName(), e);
throw new FileNodeProcessorException(e);
}
}
@@ -865,7 +863,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* @param appendFile the appended tsfile information
*/
public List<String> getOverlapFiles(IntervalFileNode appendFile, String uuid)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
List<String> overlapFiles = new ArrayList<>();
try {
for (IntervalFileNode intervalFileNode : newFileNodes) {
@@ -874,19 +872,19 @@ public class FileNodeProcessor extends Processor implements IStatistic {
continue;
}
if (intervalFileNode.getEndTime(entry.getKey()) >= entry.getValue()
- && intervalFileNode.getStartTime(entry.getKey()) <= appendFile
- .getEndTime(entry.getKey())) {
+ && intervalFileNode.getStartTime(entry.getKey()) <= appendFile
+ .getEndTime(entry.getKey())) {
String relativeFilePath = "postback" + File.separator + uuid + File.separator + "backup"
- + File.separator + intervalFileNode.getRelativePath();
+ + File.separator + intervalFileNode.getRelativePath();
File newFile = new File(
- Directories.getInstance().getTsFileFolder(intervalFileNode.getBaseDirIndex()),
- relativeFilePath);
+ Directories.getInstance().getTsFileFolder(intervalFileNode.getBaseDirIndex()),
+ relativeFilePath);
if (!newFile.getParentFile().exists()) {
newFile.getParentFile().mkdirs();
}
java.nio.file.Path link = FileSystems.getDefault().getPath(newFile.getPath());
java.nio.file.Path target = FileSystems.getDefault()
- .getPath(intervalFileNode.getFilePath());
+ .getPath(intervalFileNode.getFilePath());
Files.createLink(link, target);
overlapFiles.add(newFile.getPath());
break;
@@ -904,9 +902,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* add time series.
*/
public void addTimeSeries(String measurementToString, String dataType, String encoding,
- String[] encodingArgs) {
+ String[] encodingArgs) {
ColumnSchema col = new ColumnSchema(measurementToString, TSDataType.valueOf(dataType),
- TSEncoding.valueOf(encoding));
+ TSEncoding.valueOf(encoding));
JSONObject measurement = constrcutMeasurement(col);
fileSchema.registerMeasurement(JsonConverter.convertJsonToMeasurementSchema(measurement));
}
@@ -938,32 +936,32 @@ public class FileNodeProcessor extends Processor implements IStatistic {
long thisMergeTime = System.currentTimeMillis();
long mergeTimeInterval = thisMergeTime - lastMergeTime;
ZonedDateTime lastDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(lastMergeTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
ZonedDateTime thisDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(thisMergeTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
LOGGER.info(
- "The filenode {} last merge time is {}, this merge time is {}, "
- + "merge time interval is {}s",
- getProcessorName(), lastDateTime, thisDateTime, mergeTimeInterval / 1000);
+ "The filenode {} last merge time is {}, this merge time is {}, "
+ + "merge time interval is {}s",
+ getProcessorName(), lastDateTime, thisDateTime, mergeTimeInterval / 1000);
}
lastMergeTime = System.currentTimeMillis();
if (overflowProcessor != null) {
if (overflowProcessor.getFileSize() < IoTDBDescriptor.getInstance()
- .getConfig().overflowFileSizeThreshold) {
+ .getConfig().overflowFileSizeThreshold) {
LOGGER.info(
- "Skip this merge taks submission, because the size{} of overflow processor {} "
- + "does not reaches the threshold {}.",
- MemUtils.bytesCntToStr(overflowProcessor.getFileSize()), getProcessorName(),
- MemUtils.bytesCntToStr(
- IoTDBDescriptor.getInstance().getConfig().overflowFileSizeThreshold));
+ "Skip this merge taks submission, because the size{} of overflow processor {} "
+ + "does not reaches the threshold {}.",
+ MemUtils.bytesCntToStr(overflowProcessor.getFileSize()), getProcessorName(),
+ MemUtils.bytesCntToStr(
+ IoTDBDescriptor.getInstance().getConfig().overflowFileSizeThreshold));
return null;
}
} else {
LOGGER.info(
- "Skip this merge taks submission, because the filenode processor {} "
- + "has no overflow processor.",
- getProcessorName());
+ "Skip this merge taks submission, because the filenode processor {} "
+ + "has no overflow processor.",
+ getProcessorName());
return null;
}
if (isOverflowed && isMerging == FileNodeProcessorStatus.NONE) {
@@ -975,18 +973,18 @@ public class FileNodeProcessor extends Processor implements IStatistic {
merge();
long mergeEndTime = System.currentTimeMillis();
ZonedDateTime startDateTime = ZonedDateTime
- .ofInstant(Instant.ofEpochMilli(mergeStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ .ofInstant(Instant.ofEpochMilli(mergeStartTime),
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(mergeEndTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
long intervalTime = mergeEndTime - mergeStartTime;
LOGGER.info(
- "The filenode processor {} merge start time is {}, "
- + "merge end time is {}, merge consumes {}ms.",
- getProcessorName(), startDateTime, endDateTime, intervalTime);
+ "The filenode processor {} merge start time is {}, "
+ + "merge end time is {}, merge consumes {}ms.",
+ getProcessorName(), startDateTime, endDateTime, intervalTime);
} catch (FileNodeProcessorException e) {
LOGGER.error("The filenode processor {} encountered an error when merging.",
- getProcessorName(), e);
+ getProcessorName(), e);
throw new ErrorDebugException(e);
}
};
@@ -995,13 +993,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
} else {
if (!isOverflowed) {
LOGGER.info(
- "Skip this merge taks submission, because the filenode processor {} is not overflowed.",
- getProcessorName());
+ "Skip this merge taks submission, because the filenode processor {} is not overflowed.",
+ getProcessorName());
} else {
LOGGER.warn(
- "Skip this merge task submission, because last merge task is not over yet, "
- + "the merge filenode processor is {}",
- getProcessorName());
+ "Skip this merge task submission, because last merge task is not over yet, "
+ + "the merge filenode processor is {}",
+ getProcessorName());
}
}
return null;
@@ -1013,7 +1011,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
private void prepareForMerge() {
try {
LOGGER.info("The filenode processor {} prepares for merge, closes the bufferwrite processor",
- getProcessorName());
+ getProcessorName());
closeBufferWrite();
// try to get overflow processor
getOverflowProcessor(getProcessorName());
@@ -1021,16 +1019,16 @@ public class FileNodeProcessor extends Processor implements IStatistic {
while (!getOverflowProcessor().canBeClosed()) {
try {
LOGGER.info(
- "The filenode processor {} prepares for merge, the overflow {} can't be closed, "
- + "wait 100ms,",
- getProcessorName(), getProcessorName());
+ "The filenode processor {} prepares for merge, the overflow {} can't be closed, "
+ + "wait 100ms,",
+ getProcessorName(), getProcessorName());
TimeUnit.MICROSECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.info("The filenode processor {} prepares for merge, closes the overflow processor",
- getProcessorName());
+ getProcessorName());
getOverflowProcessor().close();
} catch (FileNodeProcessorException | OverflowProcessorException | IOException e) {
e.printStackTrace();
@@ -1063,7 +1061,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
Map<String, Long> startTimeMap = emptyIntervalFileNode.getStartTimeMap();
if (emptyIntervalFileNode.overflowChangeType != OverflowChangeType.NO_CHANGE) {
Iterator<Entry<String, Long>> iterator = emptyIntervalFileNode.getEndTimeMap().entrySet()
- .iterator();
+ .iterator();
while (iterator.hasNext()) {
Entry<String, Long> entry = iterator.next();
String deviceId = entry.getKey();
@@ -1106,7 +1104,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
writeStoreToDisk(fileNodeProcessorStore);
} catch (FileNodeProcessorException e) {
LOGGER.error("The filenode processor {} writes restore information error when merging.",
- getProcessorName(), e);
+ getProcessorName(), e);
writeUnlock();
throw new FileNodeProcessorException(e);
}
@@ -1128,12 +1126,12 @@ public class FileNodeProcessor extends Processor implements IStatistic {
overflowProcessor.switchWorkToMerge();
} catch (IOException e) {
LOGGER.error("The filenode processor {} can't switch overflow processor from work to merge.",
- getProcessorName(), e);
+ getProcessorName(), e);
writeUnlock();
throw new FileNodeProcessorException(e);
}
LOGGER.info("The filenode processor {} switches from {} to {}.", getProcessorName(),
- FileNodeProcessorStatus.NONE, FileNodeProcessorStatus.MERGING_WRITE);
+ FileNodeProcessorStatus.NONE, FileNodeProcessorStatus.MERGING_WRITE);
writeUnlock();
// query tsfile data and overflow data, and merge them
@@ -1146,40 +1144,40 @@ public class FileNodeProcessor extends Processor implements IStatistic {
String filePathBeforeMerge = backupIntervalFile.getRelativePath();
try {
LOGGER.info(
- "The filenode processor {} begins merging the {}/{} tsfile[{}] with overflow file, "
- + "the process is {}%",
- getProcessorName(), numOfMergeFiles, allNeedMergeFiles, filePathBeforeMerge,
- (int) (((numOfMergeFiles - 1) / (float) allNeedMergeFiles) * 100));
+ "The filenode processor {} begins merging the {}/{} tsfile[{}] with overflow file, "
+ + "the process is {}%",
+ getProcessorName(), numOfMergeFiles, allNeedMergeFiles, filePathBeforeMerge,
+ (int) (((numOfMergeFiles - 1) / (float) allNeedMergeFiles) * 100));
long startTime = System.currentTimeMillis();
String newFile = queryAndWriteDataForMerge(backupIntervalFile);
long endTime = System.currentTimeMillis();
long timeConsume = endTime - startTime;
ZonedDateTime startDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(endTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
LOGGER.info(
- "The fileNode processor {} has merged the {}/{} tsfile[{}->{}] over, "
- + "start time of merge is {}, end time of merge is {}, time consumption is {}ms,"
- + " the process is {}%",
- getProcessorName(), numOfMergeFiles, allNeedMergeFiles, filePathBeforeMerge, newFile,
- startDateTime, endDateTime, timeConsume,
- (int) (numOfMergeFiles) / (float) allNeedMergeFiles * 100);
+ "The fileNode processor {} has merged the {}/{} tsfile[{}->{}] over, "
+ + "start time of merge is {}, end time of merge is {}, time consumption is {}ms,"
+ + " the process is {}%",
+ getProcessorName(), numOfMergeFiles, allNeedMergeFiles, filePathBeforeMerge, newFile,
+ startDateTime, endDateTime, timeConsume,
+ (int) (numOfMergeFiles) / (float) allNeedMergeFiles * 100);
} catch (IOException | WriteProcessException | PathErrorException e) {
LOGGER.error("Merge: query and write data error.", e);
throw new FileNodeProcessorException(e);
}
} else if (backupIntervalFile.overflowChangeType == OverflowChangeType.MERGING_CHANGE) {
LOGGER.error("The overflowChangeType of backupIntervalFile must not be {}",
- OverflowChangeType.MERGING_CHANGE);
+ OverflowChangeType.MERGING_CHANGE);
// handle this error, throw one runtime exception
throw new FileNodeProcessorException(
- "The overflowChangeType of backupIntervalFile must not be "
- + OverflowChangeType.MERGING_CHANGE);
+ "The overflowChangeType of backupIntervalFile must not be "
+ + OverflowChangeType.MERGING_CHANGE);
} else {
LOGGER.debug(
- "The filenode processor {} is merging, the interval file {} doesn't need to be merged.",
- getProcessorName(), backupIntervalFile.getRelativePath());
+ "The filenode processor {} is merging, the interval file {} doesn't need to be merged.",
+ getProcessorName(), backupIntervalFile.getRelativePath());
}
}
@@ -1211,8 +1209,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
result.add(emptyIntervalFileNode.backUp());
if (!newFileNodes.isEmpty()) {
throw new FileNodeProcessorException(
- String.format("The status of empty file is %s, but the new file list is not empty",
- emptyIntervalFileNode.overflowChangeType));
+ String.format("The status of empty file is %s, but the new file list is not empty",
+ emptyIntervalFileNode.overflowChangeType));
}
return result;
}
@@ -1241,15 +1239,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
}
IntervalFileNode node = new IntervalFileNode(startTimeMap, endTimeMap,
- intervalFileNode.overflowChangeType, intervalFileNode.getBaseDirIndex(),
- intervalFileNode.getRelativePath());
+ intervalFileNode.overflowChangeType, intervalFileNode.getBaseDirIndex(),
+ intervalFileNode.getRelativePath());
result.add(node);
}
}
} else {
LOGGER.error("No file was changed when merging, the filenode is {}", getProcessorName());
throw new FileNodeProcessorException(
- "No file was changed when merging, the filenode is " + getProcessorName());
+ "No file was changed when merging, the filenode is " + getProcessorName());
}
return result;
}
@@ -1308,9 +1306,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
*/
private void switchMergeToWaitingv2(List<IntervalFileNode> backupIntervalFiles, boolean needEmpty)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
LOGGER.info("The status of filenode processor {} switches from {} to {}.", getProcessorName(),
- FileNodeProcessorStatus.MERGING_WRITE, FileNodeProcessorStatus.WAITING);
+ FileNodeProcessorStatus.MERGING_WRITE, FileNodeProcessorStatus.WAITING);
writeLock();
try {
oldMultiPassTokenSet = newMultiPassTokenSet;
@@ -1348,7 +1346,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
temp.overflowChangeType = OverflowChangeType.CHANGED;
} else {
changeTypeToChanged(deviceId, newFile.getStartTime(deviceId),
- newFile.getEndTime(deviceId));
+ newFile.getEndTime(deviceId));
}
}
}
@@ -1383,11 +1381,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
writeStoreToDisk(fileNodeProcessorStore);
} catch (FileNodeProcessorException e) {
LOGGER.error(
- "Merge: failed to write filenode information to revocery file, the filenode is {}.",
- getProcessorName(), e);
+ "Merge: failed to write filenode information to revocery file, the filenode is {}.",
+ getProcessorName(), e);
throw new FileNodeProcessorException(
- "Merge: write filenode information to revocery file failed, the filenode is "
- + getProcessorName());
+ "Merge: write filenode information to revocery file failed, the filenode is "
+ + getProcessorName());
}
}
} finally {
@@ -1396,15 +1394,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
private void switchWaitingToWorkingv2(List<IntervalFileNode> backupIntervalFiles)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
LOGGER.info("The status of filenode processor {} switches from {} to {}.", getProcessorName(),
- FileNodeProcessorStatus.WAITING, FileNodeProcessorStatus.NONE);
+ FileNodeProcessorStatus.WAITING, FileNodeProcessorStatus.NONE);
if (oldMultiPassLock != null) {
LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Lock is {}",
- oldMultiPassTokenSet,
- oldMultiPassLock);
+ oldMultiPassTokenSet,
+ oldMultiPassLock);
oldMultiPassLock.writeLock().lock();
}
try {
@@ -1417,7 +1415,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
List<File> bufferwriteDirList = new ArrayList<>();
for (String bufferwriteDirPath : bufferwriteDirPathList) {
if (bufferwriteDirPath.length() > 0
- && bufferwriteDirPath.charAt(bufferwriteDirPath.length() - 1) != File.separatorChar) {
+ && bufferwriteDirPath.charAt(bufferwriteDirPath.length() - 1) != File.separatorChar) {
bufferwriteDirPath = bufferwriteDirPath + File.separatorChar;
}
bufferwriteDirPath = bufferwriteDirPath + getProcessorName();
@@ -1439,7 +1437,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// add the restore file, if the last file is not closed
if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 1).isClosed()) {
String bufferFileRestorePath =
- newFileNodes.get(newFileNodes.size() - 1).getFilePath() + ".restore";
+ newFileNodes.get(newFileNodes.size() - 1).getFilePath() + ".restore";
bufferFiles.add(bufferFileRestorePath);
}
@@ -1472,10 +1470,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
} catch (IOException e) {
LOGGER.info(
- "The filenode processor {} encountered an error when its "
- + "status switched from {} to {}.",
- getProcessorName(), FileNodeProcessorStatus.NONE, FileNodeProcessorStatus.MERGING_WRITE,
- e);
+ "The filenode processor {} encountered an error when its "
+ + "status switched from {} to {}.",
+ getProcessorName(), FileNodeProcessorStatus.NONE, FileNodeProcessorStatus.MERGING_WRITE,
+ e);
throw new FileNodeProcessorException(e);
} finally {
writeUnlock();
@@ -1494,12 +1492,12 @@ public class FileNodeProcessor extends Processor implements IStatistic {
String measurementId) {
TSRecord record = new TSRecord(timeValuePair.getTimestamp(), deviceId);
record.addTuple(DataPoint.getDataPoint(timeValuePair.getValue().getDataType(), measurementId,
- timeValuePair.getValue().getValue().toString()));
+ timeValuePair.getValue().getValue().toString()));
return record;
}
private String queryAndWriteDataForMerge(IntervalFileNode backupIntervalFile)
- throws IOException, WriteProcessException, FileNodeProcessorException, PathErrorException {
+ throws IOException, WriteProcessException, FileNodeProcessorException, PathErrorException {
Map<String, Long> startTimeMap = new HashMap<>();
Map<String, Long> endTimeMap = new HashMap<>();
@@ -1537,26 +1535,26 @@ public class FileNodeProcessor extends Processor implements IStatistic {
String measurementId = path.getMeasurement();
TSDataType dataType = mManager.getSeriesType(path.getFullPath());
OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor.queryMerge(deviceId,
- measurementId, dataType, true);
+ measurementId, dataType, true);
Filter timeFilter = FilterFactory
- .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
- TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
+ .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
+ TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
SingleSeriesExpression seriesFilter = new SingleSeriesExpression(path, timeFilter);
IReader seriesReader = SeriesReaderFactory.getInstance()
- .createSeriesReaderForMerge(backupIntervalFile,
- overflowSeriesDataSource, seriesFilter);
+ .createSeriesReaderForMerge(backupIntervalFile,
+ overflowSeriesDataSource, seriesFilter);
try {
if (!seriesReader.hasNext()) {
LOGGER.debug(
- "The time-series {} has no data with the filter {} in the filenode processor {}",
- path, seriesFilter, getProcessorName());
+ "The time-series {} has no data with the filter {} in the filenode processor {}",
+ path, seriesFilter, getProcessorName());
} else {
numOfChunk++;
TimeValuePair timeValuePair = seriesReader.next();
if (fileIoWriter == null) {
baseDir = directories.getNextFolderForTsfile();
fileName = String.valueOf(timeValuePair.getTimestamp()
- + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis());
+ + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis());
outputPath = constructOutputFilePath(baseDir, getProcessorName(), fileName);
fileName = getProcessorName() + File.separatorChar + fileName;
fileIoWriter = new TsFileIOWriter(new File(outputPath));
@@ -1577,11 +1575,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
ChunkBuffer pageWriter = new ChunkBuffer(measurementSchema);
int pageSizeThreshold = TsFileConf.pageSizeInByte;
ChunkWriterImpl seriesWriterImpl = new ChunkWriterImpl(measurementSchema, pageWriter,
- pageSizeThreshold);
+ pageSizeThreshold);
// write the series data
recordCount += writeOneSeries(deviceId, measurementId, seriesWriterImpl, dataType,
- seriesReader,
- startTimeMap, endTimeMap, timeValuePair);
+ seriesReader,
+ startTimeMap, endTimeMap, timeValuePair);
// flush the series data
seriesWriterImpl.writeToFileWriter(fileIoWriter);
}
@@ -1599,7 +1597,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
}
} finally {
- if(mergeDeleteLock.isLocked())
+ if (mergeDeleteLock.isLocked())
mergeDeleteLock.unlock();
}
@@ -1639,7 +1637,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
timeValuePair = seriesReader.next();
endTime = timeValuePair.getTimestamp();
seriesWriterImpl
- .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
+ .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
}
if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) < endTime) {
endTimeMap.put(deviceId, endTime);
@@ -1720,7 +1718,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
timeValuePair = seriesReader.next();
endTime = timeValuePair.getTimestamp();
seriesWriterImpl
- .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
+ .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
}
if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) < endTime) {
endTimeMap.put(deviceId, endTime);
@@ -1741,7 +1739,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
timeValuePair = seriesReader.next();
endTime = timeValuePair.getTimestamp();
seriesWriterImpl
- .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
+ .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
}
if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) < endTime) {
endTimeMap.put(deviceId, endTime);
@@ -1762,7 +1760,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
File dataDir = new File(baseDir);
if (!dataDir.exists()) {
LOGGER.warn("The bufferwrite processor data dir doesn't exists, create new directory {}",
- baseDir);
+ baseDir);
dataDir.mkdirs();
}
File outputFile = new File(dataDir, fileName);
@@ -1786,7 +1784,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
private FileSchema getFileSchemaFromColumnSchema(List<ColumnSchema> schemaList, String deviceType)
- throws WriteProcessException {
+ throws WriteProcessException {
JSONArray rowGroup = new JSONArray();
for (ColumnSchema col : schemaList) {
@@ -1824,8 +1822,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
} else {
LOGGER
- .info("The filenode {} can't be closed, because it can't get oldMultiPassLock {}",
- getProcessorName(), oldMultiPassLock);
+ .info("The filenode {} can't be closed, because it can't get oldMultiPassLock {}",
+ getProcessorName(), oldMultiPassLock);
return false;
}
} else {
@@ -1836,13 +1834,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
} else {
LOGGER.info("The filenode {} can't be closed, because it can't get newMultiPassLock {}",
- getProcessorName(), newMultiPassLock);
+ getProcessorName(), newMultiPassLock);
return false;
}
} else {
LOGGER.info("The filenode {} can't be closed, because the filenode status is {}",
- getProcessorName(),
- isMerging);
+ getProcessorName(),
+ isMerging);
return false;
}
}
@@ -1867,7 +1865,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
while (!bufferWriteProcessor.canBeClosed()) {
try {
LOGGER.info("The bufferwrite {} can't be closed, wait 100ms",
- bufferWriteProcessor.getProcessorName());
+ bufferWriteProcessor.getProcessorName());
TimeUnit.MICROSECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
@@ -1913,7 +1911,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
while (!overflowProcessor.canBeClosed()) {
try {
LOGGER.info("The overflow {} can't be closed, wait 100ms",
- overflowProcessor.getProcessorName());
+ overflowProcessor.getProcessorName());
TimeUnit.MICROSECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
@@ -1970,14 +1968,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
private void writeStoreToDisk(FileNodeProcessorStore fileNodeProcessorStore)
- throws FileNodeProcessorException {
+ throws FileNodeProcessorException {
synchronized (fileNodeRestoreFilePath) {
SerializeUtil<FileNodeProcessorStore> serializeUtil = new SerializeUtil<>();
try {
serializeUtil.serialize(fileNodeProcessorStore, fileNodeRestoreFilePath);
LOGGER.debug("The filenode processor {} writes restore information to the restore file",
- getProcessorName());
+ getProcessorName());
} catch (IOException e) {
throw new FileNodeProcessorException(e);
}
@@ -1991,9 +1989,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
SerializeUtil<FileNodeProcessorStore> serializeUtil = new SerializeUtil<>();
try {
fileNodeProcessorStore = serializeUtil.deserialize(fileNodeRestoreFilePath)
- .orElse(new FileNodeProcessorStore(false, new HashMap<>(),
- new IntervalFileNode(OverflowChangeType.NO_CHANGE, null),
- new ArrayList<IntervalFileNode>(), FileNodeProcessorStatus.NONE, 0));
+ .orElse(new FileNodeProcessorStore(false, new HashMap<>(),
+ new IntervalFileNode(OverflowChangeType.NO_CHANGE, null),
+ new ArrayList<IntervalFileNode>(), FileNodeProcessorStatus.NONE, 0));
} catch (IOException e) {
throw new FileNodeProcessorException(e);
}
@@ -2006,48 +2004,62 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* { mergeIndex(); switchMergeIndex(); }
*/
- public String getFileNodeRestoreFilePath() {
- return fileNodeRestoreFilePath;
- }
+ public String getFileNodeRestoreFilePath() {
+ return fileNodeRestoreFilePath;
+ }
+
+ /**
+ * Delete data whose timestamp <= 'timestamp' and belong to timeseries deviceId.measurementId.
+ *
+ * @param deviceId the deviceId of the timeseries to be deleted.
+ * @param measurementId the measurementId of the timeseries to be deleted.
+ * @param timestamp the delete range is (0, timestamp].
+ */
+ public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
+ // TODO: how to avoid partial deletion?
+ mergeDeleteLock.lock();
+ long version = versionController.nextVersion();
- /**
- * Delete data whose timestamp <= 'timestamp' and belong to timeseries deviceId.measurementId.
- * @param deviceId the deviceId of the timeseries to be deleted.
- * @param measurementId the measurementId of the timeseries to be deleted.
- * @param timestamp the delete range is (0, timestamp].
- */
- public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
- // TODO: how to avoid partial deletion?
- mergeDeleteLock.lock();
- long version = versionController.nextVersion();
+ // record what files are updated so we can roll back them in case of exception
+ List<ModificationFile> updatedModFiles = new ArrayList<>();
- try {
- String fullPath = deviceId +
- IoTDBConstant.PATH_SEPARATOR + measurementId;
- Deletion deletion = new Deletion(fullPath, version, timestamp);
- if (mergingModification != null) {
- mergingModification.write(deletion);
- }
+ try {
+ String fullPath = deviceId +
+ IoTDBConstant.PATH_SEPARATOR + measurementId;
+ Deletion deletion = new Deletion(fullPath, version, timestamp);
+ if (mergingModification != null) {
+ mergingModification.write(deletion);
+ updatedModFiles.add(mergingModification);
+ }
- deleteBufferWriteFiles(deviceId, deletion);
- // delete data in memory
- OverflowProcessor overflowProcessor = getOverflowProcessor(getProcessorName());
- overflowProcessor.delete(deviceId, measurementId, timestamp, version);
- if (bufferWriteProcessor != null) {
- bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
- }
- } finally {
- mergeDeleteLock.unlock();
+ deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
+ // delete data in memory
+ OverflowProcessor overflowProcessor = getOverflowProcessor(getProcessorName());
+ overflowProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
+ if (bufferWriteProcessor != null) {
+ bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+ }
+ } catch (Exception e) {
+ // roll back
+ for (ModificationFile modFile : updatedModFiles) {
+ modFile.abort();
}
+ throw new IOException(e);
+ } finally {
+ mergeDeleteLock.unlock();
}
+ }
- private void deleteBufferWriteFiles(String deviceId, Deletion deletion) throws IOException {
+ private void deleteBufferWriteFiles(String deviceId, Deletion deletion,
+ List<ModificationFile> updatedModFiles) throws IOException {
if (currentIntervalFileNode != null && currentIntervalFileNode.containsDevice(deviceId)) {
currentIntervalFileNode.getModFile().write(deletion);
+ updatedModFiles.add(currentIntervalFileNode.getModFile());
}
for (IntervalFileNode fileNode : newFileNodes) {
- if(fileNode != currentIntervalFileNode && fileNode.containsDevice(deviceId)) {
+ if (fileNode != currentIntervalFileNode && fileNode.containsDevice(deviceId)) {
fileNode.getModFile().write(deletion);
+ updatedModFiles.add(fileNode.getModFile());
}
}
}
@@ -2056,17 +2068,17 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* Similar to delete(), but only deletes data in BufferWrite.
* Only used by WAL recovery.
*/
- public void deleteBufferWrite(String deviceId, String measurementId, long timestamp) throws IOException {
- String fullPath = deviceId +
- IoTDBConstant.PATH_SEPARATOR + measurementId;
- long version = versionController.nextVersion();
- Deletion deletion = new Deletion(fullPath, version, timestamp);
+ public void deleteBufferWrite(String deviceId, String measurementId, long timestamp) throws IOException {
+ String fullPath = deviceId +
+ IoTDBConstant.PATH_SEPARATOR + measurementId;
+ long version = versionController.nextVersion();
+ Deletion deletion = new Deletion(fullPath, version, timestamp);
- deleteBufferWriteFiles(deviceId, deletion);
- if (bufferWriteProcessor != null) {
- bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
- }
+ deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
+ if (bufferWriteProcessor != null) {
+ bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
}
+ }
/**
* Similar to delete(), but only deletes data in Overflow.
@@ -2079,6 +2091,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
Deletion deletion = new Deletion(fullPath, version, timestamp);
OverflowProcessor overflowProcessor = getOverflowProcessor(getProcessorName());
- overflowProcessor.delete(deviceId, measurementId, timestamp, version);
+ overflowProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
}
}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
index 2815e36..d09ab84 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
@@ -19,6 +19,7 @@ package org.apache.iotdb.db.engine.modification;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
import org.apache.iotdb.db.engine.modification.io.ModificationReader;
@@ -32,13 +33,14 @@ import org.apache.iotdb.db.engine.modification.io.ModificationWriter;
public class ModificationFile {
public static final String FILE_SUFFIX = ".mods";
- private Collection<Modification> modifications;
+ private List<Modification> modifications;
private ModificationWriter writer;
private ModificationReader reader;
private String filePath;
/**
* Construct a ModificationFile using a file as its storage.
+ *
* @param filePath the path of the storage file.
*/
public ModificationFile(String filePath) {
@@ -50,7 +52,7 @@ public class ModificationFile {
private void init() throws IOException {
synchronized (this) {
- Collection<Modification> mods = reader.read();
+ List<Modification> mods = (List<Modification>) reader.read();
if (mods == null) {
mods = new ArrayList<>();
}
@@ -74,9 +76,19 @@ public class ModificationFile {
}
}
+ public void abort() throws IOException {
+ synchronized (this) {
+ if (modifications.size() > 0) {
+ writer.abort();
+ modifications.remove(modifications.size() - 1);
+ }
+ }
+ }
+
/**
* Write a modification in this file. The modification will first be written to the persistent
* store then the memory cache.
+ *
* @param mod the modification to be written.
* @throws IOException if IOException is thrown when writing the modification to the store.
*/
@@ -90,6 +102,7 @@ public class ModificationFile {
/**
* Get all modifications stored in this file.
+ *
* @return an ArrayList of modifications.
*/
public Collection<Modification> getModifications() throws IOException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
index 9f11bae..a6d0d49 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
@@ -39,6 +39,7 @@ public class LocalTextModificationAccessor implements ModificationReader, Modifi
private static final Logger logger = LoggerFactory.getLogger(LocalTextModificationAccessor.class);
private static final String SEPARATOR = ",";
+ private static final String ABORT_MARK = "aborted";
private String filePath;
private BufferedWriter writer;
@@ -64,7 +65,11 @@ public class LocalTextModificationAccessor implements ModificationReader, Modifi
List<Modification> modificationList = new ArrayList<>();
try {
while ((line = reader.readLine()) != null) {
- modificationList.add(decodeModification(line));
+ if (line.equals(ABORT_MARK) && modificationList.size() > 0) {
+ modificationList.remove(modificationList.size() - 1);
+ } else {
+ modificationList.add(decodeModification(line));
+ }
}
} catch (IOException e) {
reader.close();
@@ -82,6 +87,16 @@ public class LocalTextModificationAccessor implements ModificationReader, Modifi
}
@Override
+ public void abort() throws IOException {
+ if (writer == null) {
+ writer = new BufferedWriter(new FileWriter(filePath, true));
+ }
+ writer.write(ABORT_MARK);
+ writer.newLine();
+ writer.flush();
+ }
+
+ @Override
public void write(Modification mod) throws IOException {
if (writer == null) {
writer = new BufferedWriter(new FileWriter(filePath, true));
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
index 5c3806b..61a7d34 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
@@ -36,4 +36,9 @@ public interface ModificationWriter {
* Release resources like streams.
*/
void close() throws IOException;
+
+ /**
+ * Abort last modification.
+ */
+ void abort() throws IOException;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java
index d834ffa..40aa7c8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.pool.FlushManager;
import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
@@ -236,17 +237,19 @@ public class OverflowProcessor extends Processor {
/**
* Delete data of a timeseries whose time ranges from 0 to timestamp.
- *
- * @param deviceId the deviceId of the timeseries.
+ * @param deviceId the deviceId of the timeseries.
* @param measurementId the measurementId of the timeseries.
* @param timestamp the upper-bound of deletion time.
* @param version the version number of this deletion.
+ * @param updatedModFiles add successfully updated Modification files to the list, and abort them
+ * when exception is
*/
- public void delete(String deviceId, String measurementId, long timestamp, long version) throws IOException {
- workResource.delete(deviceId, measurementId, timestamp, version);
+ public void delete(String deviceId, String measurementId, long timestamp, long version,
+ List<ModificationFile> updatedModFiles) throws IOException {
+ workResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
workSupport.delete(deviceId, measurementId, timestamp, false);
if (flushStatus.isFlushing()) {
- mergeResource.delete(deviceId, measurementId, timestamp, version);
+ mergeResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
flushSupport.delete(deviceId, measurementId, timestamp, true);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java
index 3a528f0..9383d61 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
@@ -294,14 +293,17 @@ public class OverflowResource {
/**
* Delete data of a timeseries whose time ranges from 0 to timestamp.
- *
- * @param deviceId the deviceId of the timeseries.
+ * @param deviceId the deviceId of the timeseries.
* @param measurementId the measurementId of the timeseries.
* @param timestamp the upper-bound of deletion time.
+ * @param updatedModFiles add successfully updated modificationFile to this list, so that the
+ * deletion can be aborted when exception is thrown.
*/
- public void delete(String deviceId, String measurementId, long timestamp, long version)
+ public void delete(String deviceId, String measurementId, long timestamp, long version,
+ List<ModificationFile> updatedModFiles)
throws IOException {
modificationFile.write(new Deletion(deviceId + IoTDBConstant.PATH_SEPARATOR
+ measurementId, version, timestamp));
+ updatedModFiles.add(modificationFile);
}
}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
index 6907a3d..be19061 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
@@ -59,4 +59,40 @@ public class ModificationFileTest {
new File(tempFileName).delete();
}
}
+
+ @Test
+ public void testAbort() {
+ String tempFileName = "mod.temp";
+ Modification[] modifications = new Modification[]{
+ new Deletion("p1", 1, 1),
+ new Deletion("p2", 2, 2),
+ new Deletion("p3", 3, 3),
+ new Deletion("p4", 4, 4),
+ };
+ try {
+ ModificationFile mFile = new ModificationFile(tempFileName);
+ for (int i = 0; i < 2; i++) {
+ mFile.write(modifications[i]);
+ }
+ List<Modification> modificationList = (List<Modification>) mFile.getModifications();
+ for (int i = 0; i < 2; i++) {
+ assertEquals(modifications[i], modificationList.get(i));
+ }
+
+ for (int i = 2; i < 4; i++) {
+ mFile.write(modifications[i]);
+ }
+ modificationList = (List<Modification>) mFile.getModifications();
+ mFile.abort();
+
+ for (int i = 0; i < 3; i++) {
+ assertEquals(modifications[i], modificationList.get(i));
+ }
+ mFile.close();
+ } catch (IOException e) {
+ fail(e.getMessage());
+ } finally {
+ new File(tempFileName).delete();
+ }
+ }
}
\ No newline at end of file