You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/01/26 02:45:05 UTC
[incubator-iotdb] branch fix_sonar_jt updated: fix other problems
in FileNodeProcessor
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch fix_sonar_jt
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/fix_sonar_jt by this push:
new 6e16e93 fix other problems in FileNodeProcessor
6e16e93 is described below
commit 6e16e93ea27b334bdbfd4a51c9c19b372637bcd4
Author: 江天 <jt...@163.com>
AuthorDate: Sat Jan 26 10:44:17 2019 +0800
fix other problems in FileNodeProcessor
---
.../iotdb/db/engine/filenode/FileNodeManager.java | 17 +-
.../db/engine/filenode/FileNodeProcessor.java | 194 +++++++++++----------
2 files changed, 110 insertions(+), 101 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 0139e03..d206960 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -418,14 +418,15 @@ public class FileNodeManager implements IStatistic, IService {
if (bufferWriteProcessor
.getFileSize() > IoTDBDescriptor.getInstance()
.getConfig().bufferwriteFileSizeThreshold) {
- String memSize = MemUtils.bytesCntToStr(bufferWriteProcessor.getFileSize());
- String memThrehold = MemUtils.bytesCntToStr(
- IoTDBDescriptor.getInstance().getConfig().bufferwriteFileSizeThreshold);
- LOGGER.info(
- "The filenode processor {} will close the bufferwrite processor, "
- + "because the size[{}] of tsfile {} reaches the threshold {}",
- filenodeName, memSize,
- bufferWriteProcessor.getFileName(), memThrehold);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "The filenode processor {} will close the bufferwrite processor, "
+ + "because the size[{}] of tsfile {} reaches the threshold {}",
+ filenodeName, MemUtils.bytesCntToStr(bufferWriteProcessor.getFileSize()),
+ bufferWriteProcessor.getFileName(), MemUtils.bytesCntToStr(
+ IoTDBDescriptor.getInstance().getConfig().bufferwriteFileSizeThreshold));
+ }
+
fileNodeProcessor.closeBufferWrite();
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 50f2b40..abac706 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -72,8 +72,6 @@ import org.apache.iotdb.db.utils.FileSchemaUtils;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.common.constant.JsonFormatConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -88,7 +86,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
import org.apache.iotdb.tsfile.write.schema.JsonConverter;
@@ -108,7 +105,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
+ " will be overflowed in the filenode processor {}, ";
private static final String RESTORE_FILE_SUFFIX = ".restore";
private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeProcessor.class);
- private static final TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig();
private static final MManager mManager = MManager.getInstance();
private static final Directories directories = Directories.getInstance();
@@ -120,15 +116,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
private volatile boolean isOverflowed;
private Map<String, Long> lastUpdateTimeMap;
private Map<String, Long> flushLastUpdateTimeMap;
- private Map<String, List<IntervalFileNode>> invertedindexOfFiles;
+ private Map<String, List<IntervalFileNode>> invertedIndexOfFiles;
private IntervalFileNode emptyIntervalFileNode;
private IntervalFileNode currentIntervalFileNode;
private List<IntervalFileNode> newFileNodes;
private FileNodeProcessorStatus isMerging;
// this is used when work->merge operation
- private int numOfMergeFile = 0;
- private FileNodeProcessorStore fileNodeProcessorStore = null;
- private String fileNodeRestoreFilePath = null;
+ private int numOfMergeFile;
+ private FileNodeProcessorStore fileNodeProcessorStore;
+ private String fileNodeRestoreFilePath;
private final Object fileNodeRestoreLock = new Object();
private String baseDirPath;
// last merge time
@@ -142,41 +138,34 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// system recovery
private boolean shouldRecovery = false;
// statistic monitor parameters
- private Map<String, Action> parameters = null;
+ private Map<String, Action> parameters;
private FileSchema fileSchema;
- private Action flushFileNodeProcessorAction = new Action() {
-
- @Override
- public void act() throws ActionException {
- synchronized (fileNodeProcessorStore) {
- try {
- writeStoreToDisk(fileNodeProcessorStore);
- } catch (FileNodeProcessorException e) {
- throw new ActionException(e);
- }
+ private Action flushFileNodeProcessorAction = () -> {
+ synchronized (fileNodeProcessorStore) {
+ try {
+ writeStoreToDisk(fileNodeProcessorStore);
+ } catch (FileNodeProcessorException e) {
+ throw new ActionException(e);
}
}
};
- private Action bufferwriteFlushAction = new Action() {
-
- @Override
- public void act() throws ActionException {
- // update the lastUpdateTime Notice: Thread safe
- synchronized (fileNodeProcessorStore) {
- // deep copy
- Map<String, Long> tempLastUpdateMap = new HashMap<>(lastUpdateTimeMap);
- // update flushLastUpdateTimeMap
- for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
- flushLastUpdateTimeMap.put(entry.getKey(), entry.getValue() + 1);
- }
- fileNodeProcessorStore.setLastUpdateTimeMap(tempLastUpdateMap);
+ private Action bufferwriteFlushAction = () -> {
+ // update the lastUpdateTime Notice: Thread safe
+ synchronized (fileNodeProcessorStore) {
+ // deep copy
+ Map<String, Long> tempLastUpdateMap = new HashMap<>(lastUpdateTimeMap);
+ // update flushLastUpdateTimeMap
+ for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
+ flushLastUpdateTimeMap.put(entry.getKey(), entry.getValue() + 1);
}
+ fileNodeProcessorStore.setLastUpdateTimeMap(tempLastUpdateMap);
}
};
+
private Action bufferwriteCloseAction = new Action() {
@Override
- public void act() throws ActionException {
+ public void act() {
synchronized (fileNodeProcessorStore) {
fileNodeProcessorStore.setLastUpdateTimeMap(lastUpdateTimeMap);
addLastTimeToIntervalFile();
@@ -197,18 +186,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
}
};
- private Action overflowFlushAction = new Action() {
-
- @Override
- public void act() throws ActionException {
+ private Action overflowFlushAction = () -> {
- // update the new IntervalFileNode List and emptyIntervalFile.
- // Notice: thread safe
- synchronized (fileNodeProcessorStore) {
- fileNodeProcessorStore.setOverflowed(isOverflowed);
- fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
- fileNodeProcessorStore.setNewFileNodes(newFileNodes);
- }
+ // update the new IntervalFileNode List and emptyIntervalFile.
+ // Notice: thread safe
+ synchronized (fileNodeProcessorStore) {
+ fileNodeProcessorStore.setOverflowed(isOverflowed);
+ fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
+ fileNodeProcessorStore.setNewFileNodes(newFileNodes);
}
};
// Token for query which used to
@@ -247,7 +232,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
if (!dataDir.exists()) {
dataDir.mkdirs();
LOGGER.info(
- "The data directory of the filenode processor {} doesn't exist. Create new directory {}",
+ "The data directory of the filenode processor {} doesn't exist. Create new " +
+ "directory {}",
getProcessorName(), baseDirPath);
}
fileNodeRestoreFilePath = new File(dataDir, processorName + RESTORE_FILE_SUFFIX).getPath();
@@ -255,7 +241,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
fileNodeProcessorStore = readStoreFromDisk();
} catch (FileNodeProcessorException e) {
LOGGER.error(
- "The fileNode processor {} encountered an error when recoverying restore information.",
+ "The fileNode processor {} encountered an error when recoverying restore " +
+ "information.",
processorName, e);
throw new FileNodeProcessorException(e);
}
@@ -265,7 +252,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
newFileNodes = fileNodeProcessorStore.getNewFileNodes();
isMerging = fileNodeProcessorStore.getFileNodeProcessorStatus();
numOfMergeFile = fileNodeProcessorStore.getNumOfMergeFile();
- invertedindexOfFiles = new HashMap<>();
+ invertedIndexOfFiles = new HashMap<>();
// deep clone
flushLastUpdateTimeMap = new HashMap<>();
for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
@@ -326,7 +313,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
HashMap<String, TSRecord> tsRecordHashMap = new HashMap<>();
TSRecord tsRecord = new TSRecord(curTime, statStorageDeltaName);
Map<String, AtomicLong> hashMap = getStatParamsHashMap();
- tsRecord.dataPointList = new ArrayList<DataPoint>();
+ tsRecord.dataPointList = new ArrayList<>();
for (Map.Entry<String, AtomicLong> entry : hashMap.entrySet()) {
tsRecord.dataPointList.add(new LongDataPoint(entry.getKey(), entry.getValue().get()));
}
@@ -357,10 +344,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
void setIntervalFileNodeStartTime(String deviceId) {
if (currentIntervalFileNode.getStartTime(deviceId) == -1) {
currentIntervalFileNode.setStartTime(deviceId, flushLastUpdateTimeMap.get(deviceId));
- if (!invertedindexOfFiles.containsKey(deviceId)) {
- invertedindexOfFiles.put(deviceId, new ArrayList<>());
+ if (!invertedIndexOfFiles.containsKey(deviceId)) {
+ invertedIndexOfFiles.put(deviceId, new ArrayList<>());
}
- invertedindexOfFiles.get(deviceId).add(currentIntervalFileNode);
+ invertedIndexOfFiles.get(deviceId).add(currentIntervalFileNode);
}
}
@@ -382,17 +369,17 @@ public class FileNodeProcessor extends Processor implements IStatistic {
private void addAllFileIntoIndex(List<IntervalFileNode> fileList) {
// clear map
- invertedindexOfFiles.clear();
+ invertedIndexOfFiles.clear();
// add all file to index
for (IntervalFileNode fileNode : fileList) {
if (fileNode.getStartTimeMap().isEmpty()) {
continue;
}
for (String deviceId : fileNode.getStartTimeMap().keySet()) {
- if (!invertedindexOfFiles.containsKey(deviceId)) {
- invertedindexOfFiles.put(deviceId, new ArrayList<>());
+ if (!invertedIndexOfFiles.containsKey(deviceId)) {
+ invertedIndexOfFiles.put(deviceId, new ArrayList<>());
}
- invertedindexOfFiles.get(deviceId).add(fileNode);
+ invertedIndexOfFiles.get(deviceId).add(fileNode);
}
}
}
@@ -441,10 +428,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
String baseDir = directories
.getTsFileFolder(newFileNodes.get(newFileNodes.size() - 1).getBaseDirIndex());
- LOGGER.info(
- "The filenode processor {} will recovery the bufferwrite processor, "
- + "the bufferwrite file is {}",
- getProcessorName(), fileNames[fileNames.length - 1]);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "The filenode processor {} will recovery the bufferwrite processor, "
+ + "the bufferwrite file is {}",
+ getProcessorName(), fileNames[fileNames.length - 1]);
+ }
+
try {
bufferWriteProcessor = new BufferWriteProcessor(baseDir, getProcessorName(),
fileNames[fileNames.length - 1], parameters, fileSchema);
@@ -538,12 +528,12 @@ public class FileNodeProcessor extends Processor implements IStatistic {
*/
public OverflowProcessor getOverflowProcessor(String processorName) throws IOException {
if (overflowProcessor == null) {
- Map<String, Action> parameters = new HashMap<>();
+ Map<String, Action> paramparams = new HashMap<>();
// construct processor or restore
- parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
- parameters
+ paramparams.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
+ paramparams
.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
- overflowProcessor = new OverflowProcessor(processorName, parameters, fileSchema);
+ overflowProcessor = new OverflowProcessor(processorName, paramparams, fileSchema);
}
return overflowProcessor;
}
@@ -611,7 +601,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* For insert overflow.
*/
public void changeTypeToChanged(String deviceId, long timestamp) {
- if (!invertedindexOfFiles.containsKey(deviceId)) {
+ if (!invertedIndexOfFiles.containsKey(deviceId)) {
LOGGER.warn(
WARN_NO_SUCH_OVERFLOWED_FILE
+ "the data is [device:{},time:{}]",
@@ -620,7 +610,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
emptyIntervalFileNode.changeTypeToChanged(isMerging);
} else {
- List<IntervalFileNode> temp = invertedindexOfFiles.get(deviceId);
+ List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
int index = searchIndexNodeByTimestamp(deviceId, timestamp, temp);
changeTypeToChanged(temp.get(index), deviceId);
}
@@ -637,7 +627,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* For update overflow.
*/
public void changeTypeToChanged(String deviceId, long startTime, long endTime) {
- if (!invertedindexOfFiles.containsKey(deviceId)) {
+ if (!invertedIndexOfFiles.containsKey(deviceId)) {
LOGGER.warn(
WARN_NO_SUCH_OVERFLOWED_FILE
+ "the data is [device:{}, start time:{}, end time:{}]",
@@ -646,7 +636,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
emptyIntervalFileNode.changeTypeToChanged(isMerging);
} else {
- List<IntervalFileNode> temp = invertedindexOfFiles.get(deviceId);
+ List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
int left = searchIndexNodeByTimestamp(deviceId, startTime, temp);
int right = searchIndexNodeByTimestamp(deviceId, endTime, temp);
for (int i = left; i <= right; i++) {
@@ -659,7 +649,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* For delete overflow.
*/
public void changeTypeToChangedForDelete(String deviceId, long timestamp) {
- if (!invertedindexOfFiles.containsKey(deviceId)) {
+ if (!invertedIndexOfFiles.containsKey(deviceId)) {
LOGGER.warn(
WARN_NO_SUCH_OVERFLOWED_FILE
+ "the data is [device:{}, delete time:{}]",
@@ -668,7 +658,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
emptyIntervalFileNode.changeTypeToChanged(isMerging);
} else {
- List<IntervalFileNode> temp = invertedindexOfFiles.get(deviceId);
+ List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
int index = searchIndexNodeByTimestamp(deviceId, timestamp, temp);
for (int i = 0; i <= index; i++) {
temp.get(i).changeTypeToChanged(isMerging);
@@ -713,13 +703,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
/**
* remove multiple pass lock.
+ * TODO: use the return value or remove it.
*/
public boolean removeMultiPassLock(int token) {
if (newMultiPassTokenSet.contains(token)) {
newMultiPassLock.readLock().unlock();
newMultiPassTokenSet.remove(token);
LOGGER
- .debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token, getProcessorName(),
+ .debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token,
+ getProcessorName(),
newMultiPassTokenSet, newMultiPassLock);
return true;
} else if (oldMultiPassTokenSet != null && oldMultiPassTokenSet.contains(token)) {
@@ -743,7 +735,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId)
throws FileNodeProcessorException {
// query overflow data
- TSDataType dataType = null;
+ TSDataType dataType;
try {
dataType = mManager.getSeriesType(deviceId + "." + measurementId);
} catch (PathErrorException e) {
@@ -814,9 +806,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
if (targetFile.exists()) {
throw new FileNodeProcessorException(
- String.format("The appended target file %s already exists.", appendFile.getFilePath()));
+ String.format("The appended target file %s already exists.",
+ appendFile.getFilePath()));
+ }
+ if (!originFile.renameTo(targetFile)) {
+ LOGGER.warn("File renaming failed when appending new file. Origin: {}, target: {}",
+ originFile.getPath(),
+ targetFile.getPath());
}
- originFile.renameTo(targetFile);
// append the new tsfile
this.newFileNodes.add(appendFile);
// update the lastUpdateTime
@@ -914,12 +911,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
if (overflowProcessor != null) {
if (overflowProcessor.getFileSize() < IoTDBDescriptor.getInstance()
.getConfig().overflowFileSizeThreshold) {
- LOGGER.info(
- "Skip this merge taks submission, because the size{} of overflow processor {} "
- + "does not reaches the threshold {}.",
- MemUtils.bytesCntToStr(overflowProcessor.getFileSize()), getProcessorName(),
- MemUtils.bytesCntToStr(
- IoTDBDescriptor.getInstance().getConfig().overflowFileSizeThreshold));
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Skip this merge taks submission, because the size{} of overflow processor {} "
+ + "does not reaches the threshold {}.",
+ MemUtils.bytesCntToStr(overflowProcessor.getFileSize()), getProcessorName(),
+ MemUtils.bytesCntToStr(
+ IoTDBDescriptor.getInstance().getConfig().overflowFileSizeThreshold));
+ }
return null;
}
} else {
@@ -937,7 +936,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
} else {
if (!isOverflowed) {
LOGGER.info(
- "Skip this merge taks submission, because the filenode processor {} is not overflowed.",
+ "Skip this merge taks submission, because the filenode processor {} is not " +
+ "overflowed.",
getProcessorName());
} else {
LOGGER.warn(
@@ -1110,8 +1110,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
while (iterator.hasNext()) {
Entry<String, Long> entry = iterator.next();
String deviceId = entry.getKey();
- if (invertedindexOfFiles.containsKey(deviceId)) {
- invertedindexOfFiles.get(deviceId).get(0).setOverflowChangeType(OverflowChangeType.CHANGED);
+ if (invertedIndexOfFiles.containsKey(deviceId)) {
+ invertedIndexOfFiles.get(deviceId).get(0).setOverflowChangeType(OverflowChangeType.CHANGED);
startTimeMap.remove(deviceId);
iterator.remove();
}
@@ -1163,7 +1163,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
Map<String, Long> startTimeMap = new HashMap<>();
Map<String, Long> endTimeMap = new HashMap<>();
for (String deviceId : intervalFileNode.getEndTimeMap().keySet()) {
- List<IntervalFileNode> temp = invertedindexOfFiles.get(deviceId);
+ List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
int index = temp.indexOf(intervalFileNode);
int size = temp.size();
// start time
@@ -1245,7 +1245,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
writeStoreToDisk(fileNodeProcessorStore);
} catch (FileNodeProcessorException e) {
LOGGER.error(
- "Merge: failed to write filenode information to revocery file, the filenode is {}.",
+ "Merge: failed to write filenode information to revocery file, the filenode is " +
+ "{}.",
getProcessorName(), e);
throw new FileNodeProcessorException(
"Merge: write filenode information to revocery file failed, the filenode is "
@@ -1259,8 +1260,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
private void updateEmpty(IntervalFileNode empty, List<IntervalFileNode> result) {
for (String deviceId : empty.getStartTimeMap().keySet()) {
- if (invertedindexOfFiles.containsKey(deviceId)) {
- IntervalFileNode temp = invertedindexOfFiles.get(deviceId).get(0);
+ if (invertedIndexOfFiles.containsKey(deviceId)) {
+ IntervalFileNode temp = invertedIndexOfFiles.get(deviceId).get(0);
if (temp.getMergeChanged().contains(deviceId)) {
empty.setOverflowChangeType(OverflowChangeType.CHANGED);
break;
@@ -1383,8 +1384,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
continue;
}
for (File file : files) {
- if (!bufferFiles.contains(file.getPath())) {
- file.delete();
+ if (!bufferFiles.contains(file.getPath()) && !file.delete()) {
+ LOGGER.warn("Cannot delete BufferWrite file {}", file.getPath());
}
}
}
@@ -1412,7 +1413,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
List<Path> pathList = new ArrayList<>();
mergeIsRowGroupHasData = false;
mergeStartPos = -1;
- ChunkGroupFooter footer = null;
+ ChunkGroupFooter footer;
int numOfChunk = 0;
try {
List<String> pathStrings = mManager.getLeafNodePathInNextLevel(deviceId);
@@ -1427,7 +1428,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
continue;
}
for (Path path : pathList) {
- // query one measurenment in the special deviceId
+ // query one measurement in the special deviceId
String measurementId = path.getMeasurement();
TSDataType dataType = mManager.getSeriesType(path.getFullPath());
OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor.queryMerge(deviceId,
@@ -1795,7 +1796,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
Objects.equals(statParamsHashMap, that.statParamsHashMap) &&
Objects.equals(lastUpdateTimeMap, that.lastUpdateTimeMap) &&
Objects.equals(flushLastUpdateTimeMap, that.flushLastUpdateTimeMap) &&
- Objects.equals(invertedindexOfFiles, that.invertedindexOfFiles) &&
+ Objects.equals(invertedIndexOfFiles, that.invertedIndexOfFiles) &&
Objects.equals(emptyIntervalFileNode, that.emptyIntervalFileNode) &&
Objects.equals(currentIntervalFileNode, that.currentIntervalFileNode) &&
Objects.equals(newFileNodes, that.newFileNodes) &&
@@ -1819,7 +1820,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), statStorageDeltaName, statParamsHashMap, isOverflowed, lastUpdateTimeMap, flushLastUpdateTimeMap, invertedindexOfFiles, emptyIntervalFileNode, currentIntervalFileNode, newFileNodes, isMerging, numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath, baseDirPath, lastMergeTime, bufferWriteProcessor, overflowProcessor, oldMultiPassTokenSet, newMultiPassTokenSet, oldMultiPassLock, newMultiPassLock, shouldRecovery, parameters, fileSchema, flu [...]
+ return Objects.hash(super.hashCode(), statStorageDeltaName, statParamsHashMap, isOverflowed,
+ lastUpdateTimeMap, flushLastUpdateTimeMap, invertedIndexOfFiles,
+ emptyIntervalFileNode, currentIntervalFileNode, newFileNodes, isMerging,
+ numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath, baseDirPath,
+ lastMergeTime, bufferWriteProcessor, overflowProcessor, oldMultiPassTokenSet,
+ newMultiPassTokenSet, oldMultiPassLock, newMultiPassLock, shouldRecovery, parameters,
+ fileSchema, flushFileNodeProcessorAction, bufferwriteFlushAction,
+ bufferwriteCloseAction, overflowFlushAction, multiPassLockToken);
}
public class MergeRunnale implements Runnable {