You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/21 07:52:30 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: fix
asyncFlushAndSealAllFiles in FileNodeManagerV2
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 29e7485 fix asyncFlushAndSealAllFiles in FileNodeManagerV2
29e7485 is described below
commit 29e74854b531fee767295be34313461505bc4d1f
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jun 21 15:52:18 2019 +0800
fix asyncFlushAndSealAllFiles in FileNodeManagerV2
---
.../db/engine/filenodeV2/FileNodeManagerV2.java | 39 +++++-----------------
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 7 ++--
2 files changed, 13 insertions(+), 33 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index dac26b8..dcd3924 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -66,7 +66,7 @@ public class FileNodeManagerV2 implements IService {
* This map is used to manage all filenode processor,<br> the key is filenode name which is
* storage group seriesPath.
*/
- private final ConcurrentHashMap<String, FileNodeProcessorV2> processorMap;
+ private final ConcurrentHashMap<String, FileNodeProcessorV2> processorMap = new ConcurrentHashMap<>();
private static final FileNodeManagerV2 INSTANCE = new FileNodeManagerV2();
@@ -85,19 +85,12 @@ public class FileNodeManagerV2 implements IService {
private FileNodeManagerV2() {
- String normalizedBaseDir = config.getFileNodeDir();
- if (normalizedBaseDir.charAt(normalizedBaseDir.length() - 1) != File.separatorChar) {
- normalizedBaseDir += Character.toString(File.separatorChar);
- }
- baseDir = normalizedBaseDir;
- processorMap = new ConcurrentHashMap<>();
-
+ baseDir = FilePathUtils.regularizePath(config.getFileNodeDir());
// create baseDir
File dir = new File(baseDir);
if (dir.mkdirs()) {
- LOGGER.info("baseDir {} doesn't exist, create it", dir.getPath());
+ LOGGER.info("Base directory {} of all storage groups doesn't exist, create it", dir.getPath());
}
-
}
@Override
@@ -182,31 +175,15 @@ public class FileNodeManagerV2 implements IService {
return fileNodeProcessor.insert(tsRecord);
}
- private void closeAllFileNodeProcessor() {
- synchronized (processorMap) {
- LOGGER.info("Start to setCloseMark all FileNode");
- if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
- LOGGER.info(
- "Failed to setCloseMark all FileNode processor because the FileNodeManager's status is {}",
- fileNodeManagerStatus);
- return;
- }
-
- fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
-
- for (Map.Entry<String, FileNodeProcessorV2> processorEntry : processorMap.entrySet()) {
+ public void asyncFlushAndSealAllFiles() {
+ synchronized (processorMap) {
+ for (FileNodeProcessorV2 fileNodeProcessor: processorMap.values()) {
+ fileNodeProcessor.asyncForceClose();
}
-
}
}
- private void checkTimestamp(TSRecord tsRecord) throws FileNodeManagerException {
- if (tsRecord.time < 0) {
- LOGGER.error("The insert time lt 0, {}.", tsRecord);
- throw new FileNodeManagerException("The insert time lt 0, the tsrecord is " + tsRecord);
- }
- }
/**
* recovery the filenode processor.
@@ -341,7 +318,7 @@ public class FileNodeManagerV2 implements IService {
private void deleteFileNodeBlocked(String processorName) throws IOException {
LOGGER.info("Forced to delete the filenode processor {}", processorName);
FileNodeProcessorV2 processor = processorMap.get(processorName);
- processor.syncCloseAndReleaseFileNode(() -> {
+ processor.syncCloseAndStopFileNode(() -> {
String fileNodePath = IoTDBDescriptor.getInstance().getConfig().getFileNodeDir();
fileNodePath = FilePathUtils.regularizePath(fileNodePath) + processorName;
try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index f1b05fe..73dcc08 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -106,7 +106,10 @@ public class FileNodeProcessorV2 {
*/
try {
File storageGroupInfoDir = new File(baseDir, storageGroupName);
- storageGroupInfoDir.mkdirs();
+ if (storageGroupInfoDir.mkdirs()) {
+ LOGGER.info("Storage Group Info Directory {} doesn't exist, create it", storageGroupInfoDir.getPath());
+ }
+
versionController = new SimpleFileVersionController(
storageGroupInfoDir.getPath());
} catch (IOException e) {
@@ -352,7 +355,7 @@ public class FileNodeProcessorV2 {
/**
* This method will be blocked until this file node can be closed.
*/
- public void syncCloseAndReleaseFileNode(Supplier<Boolean> removeProcessorFromManagerCallback){
+ public void syncCloseAndStopFileNode(Supplier<Boolean> removeProcessorFromManagerCallback){
lock.writeLock().lock();
try {
asyncForceClose();