You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/11 12:11:50 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: add
lock and unlock log
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 1d4ff36 add lock and unlock log
1d4ff36 is described below
commit 1d4ff3676a6e3dc88c2cdbc6a35dd5197d629413
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 11 20:11:36 2019 +0800
add lock and unlock log
---
.../java/org/apache/iotdb/db/engine/Processor.java | 31 +-
.../iotdb/db/engine/filenode/FileNodeManager.java | 24 +-
.../db/engine/filenode/FileNodeProcessor.java | 68 ++--
.../iotdb/db/engine/overflow/io/OverflowIO.java | 2 +-
.../db/engine/overflow/io/OverflowResource.java | 2 +-
.../recover/BufferWriteRecoverPerformer.java | 365 ---------------------
6 files changed, 77 insertions(+), 415 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
index 5c515ff..0388476 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
@@ -25,6 +25,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Processor is used for implementing different processor with different operation.<br>
@@ -34,7 +36,9 @@ import org.apache.iotdb.db.exception.ProcessorException;
*/
public abstract class Processor {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class);
private final ReadWriteLock lock;
+ private long start;
protected String processorName;
/**
@@ -52,6 +56,10 @@ public abstract class Processor {
*/
public void readUnlock() {
lock.readLock().unlock();
+ start = System.currentTimeMillis() - start;
+ if (start > 1000) {
+ LOGGER.info("Processor {} hold lock for {}ms", processorName, start, new RuntimeException());
+ }
}
/**
@@ -59,6 +67,7 @@ public abstract class Processor {
*/
public void readLock() {
lock.readLock().lock();
+ start = System.currentTimeMillis();
}
/**
@@ -66,6 +75,7 @@ public abstract class Processor {
*/
public void writeLock() {
lock.writeLock().lock();
+ start = System.currentTimeMillis();
}
/**
@@ -73,6 +83,10 @@ public abstract class Processor {
*/
public void writeUnlock() {
lock.writeLock().unlock();
+ start = System.currentTimeMillis() - start;
+ if (start > 1000) {
+ LOGGER.info("Processor {} hold lock for {}ms", processorName, start, new RuntimeException());
+ }
}
/**
@@ -85,6 +99,7 @@ public abstract class Processor {
} else {
lock.readLock().lock();
}
+ start = System.currentTimeMillis();
}
public boolean tryLock(boolean isWriteLock) {
@@ -105,6 +120,10 @@ public abstract class Processor {
} else {
readUnlock();
}
+ start = System.currentTimeMillis() - start;
+ if (start > 1000) {
+ LOGGER.info("Processor {} hold lock for {}ms", processorName, start, new RuntimeException());
+ }
}
/**
@@ -122,7 +141,11 @@ public abstract class Processor {
* @return
*/
public boolean tryWriteLock() {
- return lock.writeLock().tryLock();
+ boolean result = lock.writeLock().tryLock();
+ if (result) {
+ start = System.currentTimeMillis();
+ }
+ return result;
}
/**
@@ -131,7 +154,11 @@ public abstract class Processor {
* @return
*/
public boolean tryReadLock() {
- return lock.readLock().tryLock();
+ boolean result = lock.readLock().tryLock();
+ if (result) {
+ start = System.currentTimeMillis();
+ }
+ return result;
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 2b1f8ce..a1fe9cd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -128,18 +128,18 @@ public class FileNodeManager implements IStatistic, IService {
statMonitor.registerStatistics(MonitorConstants.STAT_STORAGE_DELTA_NAME, this);
}
- closedProcessorCleaner.scheduleWithFixedDelay(()->{
- int size = 0;
- for (FileNodeProcessor fileNodeProcessor : processorMap.values()) {
- size += fileNodeProcessor.getClosingBufferWriteProcessor().size();
- }
- if (size > 5) {
- LOGGER.info("Current closing processor number is {}", size);
- }
- for (FileNodeProcessor fileNodeProcessor : processorMap.values()) {
- fileNodeProcessor.checkAllClosingProcessors();
- }
- }, 0, 3000, TimeUnit.MILLISECONDS);
+// closedProcessorCleaner.scheduleWithFixedDelay(()->{
+// int size = 0;
+// for (FileNodeProcessor fileNodeProcessor : processorMap.values()) {
+// size += fileNodeProcessor.getClosingBufferWriteProcessor().size();
+// }
+// if (size > 5) {
+// LOGGER.info("Current closing processor number is {}", size);
+// }
+// for (FileNodeProcessor fileNodeProcessor : processorMap.values()) {
+// fileNodeProcessor.checkAllClosingProcessors();
+// }
+// }, 0, 3000, TimeUnit.MILLISECONDS);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index ef36dfa..df31499 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -2124,38 +2124,38 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
}
- /**
- * wait for all closing processors finishing their tasks
- */
- public void waitforAllClosed() throws FileNodeProcessorException {
- close();
- while (getClosingBufferWriteProcessor().size() != 0) {
- checkAllClosingProcessors();
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOGGER.error("Filenode Processor {} is interrupted when waiting for all closed.", processorName, e);
- }
- }
- }
-
-
- void checkAllClosingProcessors() {
- Iterator<BufferWriteProcessor> iterator =
- this.getClosingBufferWriteProcessor().iterator();
- while (iterator.hasNext()) {
- BufferWriteProcessor processor = iterator.next();
- try {
- if (processor.getCloseFuture().get(10, TimeUnit.MILLISECONDS)) {
- //if finished, we can remove it.
- iterator.remove();
- }
- } catch (InterruptedException | ExecutionException e) {
- LOGGER.error("Close bufferwrite processor {} failed.", processor.getProcessorName(), e);
- } catch (TimeoutException e) {
- //do nothing.
- }
- }
- this.getClosingBufferWriteProcessor().reset();
- }
+// /**
+// * wait for all closing processors finishing their tasks
+// */
+// public void waitforAllClosed() throws FileNodeProcessorException {
+// close();
+// while (getClosingBufferWriteProcessor().size() != 0) {
+// checkAllClosingProcessors();
+// try {
+// Thread.sleep(10);
+// } catch (InterruptedException e) {
+// LOGGER.error("Filenode Processor {} is interrupted when waiting for all closed.", processorName, e);
+// }
+// }
+// }
+
+
+// void checkAllClosingProcessors() {
+// Iterator<BufferWriteProcessor> iterator =
+// this.getClosingBufferWriteProcessor().iterator();
+// while (iterator.hasNext()) {
+// BufferWriteProcessor processor = iterator.next();
+// try {
+// if (processor.getCloseFuture().get(10, TimeUnit.MILLISECONDS)) {
+// //if finished, we can remove it.
+// iterator.remove();
+// }
+// } catch (InterruptedException | ExecutionException e) {
+// LOGGER.error("Close bufferwrite processor {} failed.", processor.getProcessorName(), e);
+// } catch (TimeoutException e) {
+// //do nothing.
+// }
+// }
+// this.getClosingBufferWriteProcessor().reset();
+// }
}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java
index 90f0423..e5c908c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java
@@ -43,7 +43,7 @@ public class OverflowIO extends TsFileIOWriter {
}
public void clearRowGroupMetadatas() {
- super.chunkGroupMetaDataList.clear();
+ super.flushedChunkGroupMetaDataList.clear();
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
index e7167fb..d56eb3f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
@@ -232,7 +232,7 @@ public class OverflowResource {
long lastPosition = insertIO.getPos();
// MemTableFlushUtil.flushMemTable(fileSchema, insertIO, memTable,
// versionController.nextVersion());
- MemTableFlushTask task = new MemTableFlushTask(insertIO, processorName);
+ MemTableFlushTask task = new MemTableFlushTask(insertIO, processorName, 0L, (a, b) -> {});
task.flushMemTable(fileSchema, memTable, versionController.nextVersion());
List<ChunkGroupMetaData> rowGroupMetaDatas = insertIO.getChunkGroupMetaDatas();
appendInsertMetadatas.addAll(rowGroupMetaDatas);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/BufferWriteRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/BufferWriteRecoverPerformer.java
deleted file mode 100644
index 9281029..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/BufferWriteRecoverPerformer.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.writelog.recover;
-
-import static org.apache.iotdb.db.writelog.RecoverStage.BACK_UP;
-import static org.apache.iotdb.db.writelog.RecoverStage.CLEAN_UP;
-import static org.apache.iotdb.db.writelog.RecoverStage.INIT;
-import static org.apache.iotdb.db.writelog.RecoverStage.RECOVER_FILE;
-import static org.apache.iotdb.db.writelog.RecoverStage.REPLAY_LOG;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.commons.io.FileUtils;
-import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
-import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.exception.RecoverException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.writelog.RecoverStage;
-import org.apache.iotdb.db.writelog.io.RAFLogReader;
-import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
-import org.apache.iotdb.db.writelog.replay.ConcreteLogReplayer;
-import org.apache.iotdb.db.writelog.replay.LogReplayer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BufferWriteRecoverPerformer implements RecoverPerformer {
-
- public static final String RECOVER_FLAG_NAME = "recover-flag";
- public static final String RECOVER_SUFFIX = "-recover";
- public static final String FLAG_SEPERATOR = "-";
- private static final Logger logger = LoggerFactory.getLogger(BufferWriteRecoverPerformer.class);
- // The two fields can be made static only because the recovery is a serial process.
- private static RAFLogReader rafLogReader = new RAFLogReader();
-
- private File tsFile;
- private File restoreFile;
- private File[] logFiles;
-
- private String recoveryFlagPath;
-
- private RecoverStage currStage;
- private LogReplayer replayer = new ConcreteLogReplayer();
-
- /**
- * constructor of ExclusiveLogRecoverPerformer.
- */
- public BufferWriteRecoverPerformer(File tsFile, File restoreFile, File[] logFiles) {
- this.tsFile = tsFile;
- this.restoreFile = restoreFile;
- this.logFiles = logFiles;
- }
-
-
- public void setReplayer(LogReplayer replayer) {
- this.replayer = replayer;
- }
-
- @Override
- public void recover() throws RecoverException {
- currStage = determineStage();
- if (currStage != null) {
- recoverAtStage(currStage);
- }
- }
-
- private RecoverStage determineStage() throws RecoverException {
- File logDir = new File(writeLogNode.getLogDirectory());
- if (!logDir.exists()) {
- logger.error("Log node {} directory does not exist, recover failed",
- writeLogNode.getLogDirectory());
- throw new RecoverException("No directory for log node " + writeLogNode.getIdentifier());
- }
- // search for the flag file
- File[] files = logDir.listFiles((dir, name) -> name.contains(RECOVER_FLAG_NAME));
-
- if (files == null || files.length == 0) {
- File[] logFiles = logDir
- .listFiles((dir, name) -> name.contains(ExclusiveWriteLogNode.WAL_FILE_NAME));
- // no flag is set, and there exists log file, start from beginning.
- if (logFiles != null && logFiles.length > 0) {
- return RecoverStage.BACK_UP;
- } else {
- // no flag is set, and there is no log file, do not recover.
- return null;
- }
- }
-
- File flagFile = files[0];
- String flagName = flagFile.getName();
- recoveryFlagPath = flagFile.getPath();
- // the flag name is like "recover-flag-{flagType}"
- String[] parts = flagName.split(FLAG_SEPERATOR);
- if (parts.length != 3) {
- logger.error("Log node {} invalid recover flag name {}", writeLogNode.getIdentifier(),
- flagName);
- throw new RecoverException("Illegal recover flag " + flagName);
- }
- String stageName = parts[2];
- // if a flag of stage X is found, that means X had finished, so start from next stage
- if (stageName.equals(BACK_UP.name())) {
- return RECOVER_FILE;
- } else if (stageName.equals(REPLAY_LOG.name())) {
- return CLEAN_UP;
- } else {
- logger.error("Log node {} invalid recover flag name {}", writeLogNode.getIdentifier(),
- flagName);
- throw new RecoverException("Illegal recover flag " + flagName);
- }
- }
-
- private void recoverAtStage(RecoverStage stage) throws RecoverException {
- switch (stage) {
- case INIT:
- case BACK_UP:
- backup();
- break;
- case RECOVER_FILE:
- recoverFile();
- break;
- case REPLAY_LOG:
- replayLog();
- break;
- case CLEAN_UP:
- cleanup();
- break;
- default:
- logger.error("Invalid stage {}", stage);
- }
- }
-
- private void setFlag(RecoverStage stage) {
- if (recoveryFlagPath == null) {
- recoveryFlagPath =
- writeLogNode.getLogDirectory() + File.separator + RECOVER_FLAG_NAME + FLAG_SEPERATOR
- + stage.name();
- try {
- File flagFile = new File(recoveryFlagPath);
- if (!flagFile.createNewFile()) {
- logger
- .error("Log node {} cannot set flag at stage {}", writeLogNode.getLogDirectory(),
- stage.name());
- }
- } catch (IOException e) {
- logger.error("Log node {} cannot set flag at stage {}", writeLogNode.getLogDirectory(),
- stage.name(), e);
- }
- } else {
- File flagFile = new File(recoveryFlagPath);
- recoveryFlagPath = recoveryFlagPath.replace(FLAG_SEPERATOR + currStage.name(),
- FLAG_SEPERATOR + stage.name());
- if (!flagFile.renameTo(new File(recoveryFlagPath))) {
- logger
- .error("Log node {} cannot update flag at stage {}", writeLogNode.getLogDirectory(),
- stage.name());
- }
- }
- }
-
- private void cleanFlag() throws RecoverException {
- if (recoveryFlagPath != null) {
- File flagFile = new File(recoveryFlagPath);
- if (!flagFile.delete()) {
- logger.error("Log node {} cannot clean flag ", writeLogNode.getLogDirectory());
- throw new RecoverException("Cannot clean flag");
- }
- }
- }
-
- private void backup() throws RecoverException {
- String recoverRestoreFilePath = restoreFilePath + RECOVER_SUFFIX;
- File recoverRestoreFile = new File(recoverRestoreFilePath);
- File restoreFile = new File(restoreFilePath);
- if (!recoverRestoreFile.exists() && restoreFile.exists()) {
- try {
- FileUtils.copyFile(restoreFile, recoverRestoreFile);
- } catch (Exception e) {
- logger.error("Log node {} cannot backup restore file",
- writeLogNode.getLogDirectory(), e);
- throw new RecoverException("Cannot backup restore file, recovery aborted.");
- }
- }
-
- String recoverProcessorStoreFilePath = processorStoreFilePath + RECOVER_SUFFIX;
- File recoverProcessorStoreFile = new File(recoverProcessorStoreFilePath);
- File processorStoreFile = new File(processorStoreFilePath);
- if (!recoverProcessorStoreFile.exists() && processorStoreFile.exists()) {
- try {
- FileUtils.copyFile(processorStoreFile, recoverProcessorStoreFile);
- } catch (Exception e) {
- logger.error("Log node {} cannot backup processor file",
- writeLogNode.getLogDirectory(), e);
- throw new RecoverException("Cannot backup processor file, recovery aborted.");
- }
- }
-
- setFlag(BACK_UP);
- currStage = RECOVER_FILE;
- logger.info("Log node {} backup ended", writeLogNode.getLogDirectory());
- recoverFile();
- }
-
- private void recoverFile() throws RecoverException {
- String recoverRestoreFilePath = restoreFilePath + RECOVER_SUFFIX;
- File recoverRestoreFile = new File(recoverRestoreFilePath);
- try {
- if (recoverRestoreFile.exists()) {
- FileUtils.copyFile(recoverRestoreFile, new File(restoreFilePath));
- }
- } catch (Exception e) {
- logger.error("Log node {} cannot recover restore file.",
- writeLogNode.getLogDirectory(), e);
- throw new RecoverException("Cannot recover restore file, recovery aborted.");
- }
-
- String recoverProcessorStoreFilePath = processorStoreFilePath + RECOVER_SUFFIX;
- File recoverProcessorStoreFile = new File(recoverProcessorStoreFilePath);
- try {
- if (recoverProcessorStoreFile.exists()) {
- FileUtils.copyFile(recoverProcessorStoreFile, new File(processorStoreFilePath));
- }
- } catch (Exception e) {
- throw new RecoverException(String.format("Log node %s cannot recover processor file,"
- + " recovery aborted.", writeLogNode.getLogDirectory()), e);
- }
-
- fileNodeRecoverPerformer.recover();
-
- currStage = REPLAY_LOG;
- logger.info("Log node {} recover files ended", writeLogNode.getLogDirectory());
- replayLog();
- }
-
- private int replayLogFile(File logFile) throws RecoverException, IOException {
- int failedCnt = 0;
- if (logFile.exists()) {
- try {
- rafLogReader.open(logFile);
- } catch (FileNotFoundException e) {
- logger
- .error("Log node {} cannot read old log file, because ", writeLogNode.getIdentifier(),
- e);
- throw new RecoverException("Cannot read old log file, recovery aborted.");
- }
- while (rafLogReader.hasNext()) {
- try {
- PhysicalPlan physicalPlan = rafLogReader.next();
- if (physicalPlan == null) {
- logger.error("Log node {} read a bad log", writeLogNode.getIdentifier());
- throw new RecoverException("Cannot read old log file, recovery aborted.");
- }
- replayer.replay(physicalPlan, isOverflow);
- } catch (ProcessorException e) {
- failedCnt++;
- logger.error("Log node {}", writeLogNode.getLogDirectory(), e);
- }
- }
- rafLogReader.close();
- }
- return failedCnt;
- }
-
- private void replayLog() throws RecoverException {
- int failedEntryCnt = 0;
- // if old log file exists, replay it first.
- File logFolder = new File(writeLogNode.getLogDirectory());
- for (File file : logFolder.listFiles( name -> name.getName().startsWith(ExclusiveWriteLogNode.WAL_FILE_NAME
- + ExclusiveWriteLogNode.OLD_SUFFIX))) {
- try {
- failedEntryCnt += replayLogFile(file);
- } catch (IOException e) {
- throw new RecoverException(e);
- }
- }
-
- // then replay new log
- File newLogFile = new File(
- writeLogNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME);
- try {
- failedEntryCnt += replayLogFile(newLogFile);
- } catch (IOException e) {
- throw new RecoverException(e);
- }
- // TODO : do we need to proceed if there are failed logs ?
- if (failedEntryCnt > 0) {
- throw new RecoverException(
- "There are " + failedEntryCnt
- + " logs failed to recover, see logs above for details");
- }
- try {
- FileNodeManager.getInstance().closeOneFileNode(writeLogNode.getFileNodeName());
- } catch (FileNodeManagerException e) {
- throw new RecoverException(String.format("Log node %s cannot perform flush"
- + " after replaying logs!", writeLogNode.getIdentifier()), e);
- }
- currStage = CLEAN_UP;
- setFlag(REPLAY_LOG);
- logger.info("Log node {} replay ended.", writeLogNode.getLogDirectory());
- cleanup();
- }
-
- private void cleanup() throws RecoverException {
- // clean recovery files
- List<String> failedFiles = new ArrayList<>();
- String recoverRestoreFilePath = restoreFilePath + RECOVER_SUFFIX;
- File recoverRestoreFile = new File(recoverRestoreFilePath);
- if (recoverRestoreFile.exists() && !recoverRestoreFile.delete()) {
- logger
- .error("Log node {} cannot delete backup restore file", writeLogNode.getLogDirectory());
- failedFiles.add(recoverRestoreFilePath);
- }
- String recoverProcessorStoreFilePath = processorStoreFilePath + RECOVER_SUFFIX;
- File recoverProcessorStoreFile = new File(recoverProcessorStoreFilePath);
- if (recoverProcessorStoreFile.exists() && !recoverProcessorStoreFile.delete()) {
- logger.error("Log node {} cannot delete backup processor store file",
- writeLogNode.getLogDirectory());
- failedFiles.add(recoverProcessorStoreFilePath);
- }
- // clean log file
- File logFolder = new File(writeLogNode.getLogDirectory());
- for (File file : logFolder.listFiles( name -> name.getName().startsWith(ExclusiveWriteLogNode.WAL_FILE_NAME
- + ExclusiveWriteLogNode.OLD_SUFFIX))) {
- if (file.exists() && !file.delete()) {
- logger.error("Log node {} cannot delete old log file", writeLogNode.getLogDirectory());
- failedFiles.add(file.getPath());
- }
- }
-
- File newLogFile = new File(
- writeLogNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME);
- if (newLogFile.exists() && !newLogFile.delete()) {
- logger.error("Log node {} cannot delete new log file", writeLogNode.getLogDirectory());
- failedFiles.add(newLogFile.getPath());
- }
- if (!failedFiles.isEmpty()) {
- throw new RecoverException(
- "File clean failed. Failed files are " + failedFiles.toString());
- }
- // clean flag
- currStage = INIT;
- cleanFlag();
- logger.info("Log node {} cleanup ended.", writeLogNode.getLogDirectory());
- }
-}