You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/19 06:37:15 UTC
[incubator-iotdb] 01/02: fix FNP insert
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit e8dc717e5abae29b6f05d772852bf61ec6dc4902
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jun 19 14:01:27 2019 +0800
fix FNP insert
---
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 66 +++++++++++-----------
1 file changed, 34 insertions(+), 32 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index c1cad5b..e5902c1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.engine.AbstractUnsealedDataFileProcessorV2;
import org.apache.iotdb.db.engine.bufferwriteV2.BufferWriteProcessorV2;
import org.apache.iotdb.db.engine.filenode.CopyOnWriteLinkedList;
import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
+import org.apache.iotdb.db.engine.overflowV2.OverflowProcessorV2;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.FileNodeProcessorException;
@@ -73,7 +74,7 @@ public class FileNodeProcessorV2 {
// for overflow
private List<TsFileResourceV2> unsequenceFileList;
- private BufferWriteProcessorV2 workOverflowProcessor = null;
+ private OverflowProcessorV2 workOverflowProcessor = null;
private CopyOnWriteLinkedList<BufferWriteProcessorV2> closingOverflowProcessor = new CopyOnWriteLinkedList<>();
/**
@@ -123,8 +124,7 @@ public class FileNodeProcessorV2 {
try {
fileNodeProcessorStore = readStoreFromDisk();
} catch (FileNodeProcessorException e) {
- LOGGER.error(
- "The fileNode processor {} encountered an error when recoverying restore " +
+ LOGGER.error("The fileNode processor {} encountered an error when recoverying restore " +
"information.", storageGroup);
throw new FileNodeProcessorException(e);
}
@@ -176,24 +176,51 @@ public class FileNodeProcessorV2 {
}
}
- private void writeUnsealedDataFile(AbstractUnsealedDataFileProcessorV2 udfProcessor, TSRecord tsRecord, boolean sequence) {
+ public boolean insert(TSRecord tsRecord) {
+ lock.writeLock().lock();
+ boolean result = true;
+
+ try {
+ // init map
+ latestTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
+ latestFlushTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
+
+ // write to sequence or unsequence file
+ if (tsRecord.time > latestFlushTimeMap.get(tsRecord.deviceId)) {
+ writeUnsealedDataFile(workBufferWriteProcessor, tsRecord, true);
+ } else {
+ writeUnsealedDataFile(workOverflowProcessor, tsRecord, false);
+ }
+ } catch (Exception e) {
+ LOGGER.error("insert tsRecord to unsealed data file failed, because {}", e.getMessage(), e);
+ result = false;
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ return result;
+ }
+
+
+ private void writeUnsealedDataFile(AbstractUnsealedDataFileProcessorV2 udfProcessor,
+ TSRecord tsRecord, boolean sequence) throws IOException {
boolean result;
// create a new BufferWriteProcessor
if (udfProcessor == null) {
if (sequence) {
String baseDir = directories.getNextFolderForTsfile();
String filePath = Paths.get(baseDir, storageGroup, tsRecord.time + "").toString();
- workBufferWriteProcessor = new BufferWriteProcessorV2(storageGroup, new File(filePath),
+ udfProcessor = new BufferWriteProcessorV2(storageGroup, new File(filePath),
fileSchema, versionController, this::closeBufferWriteProcessorCallBack);
sequenceFileList.add(udfProcessor.getTsFileResource());
} else {
+ // TODO check if the disk is full
String baseDir = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDir();
String filePath = Paths.get(baseDir, storageGroup, tsRecord.time + "").toString();
- workBufferWriteProcessor = new BufferWriteProcessorV2(storageGroup, new File(filePath),
+ udfProcessor = new OverflowProcessorV2(storageGroup, new File(filePath),
fileSchema, versionController, this::closeBufferWriteProcessorCallBack);
unsequenceFileList.add(udfProcessor.getTsFileResource());
}
- // TODO check if the disk is full
}
// write BufferWrite
@@ -210,31 +237,6 @@ public class FileNodeProcessorV2 {
}
}
- public boolean insert(TSRecord tsRecord) {
- lock.writeLock().lock();
- boolean result = true;
-
- try {
-
- // init map
- latestTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
- latestFlushTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
-
- if (tsRecord.time > latestFlushTimeMap.get(tsRecord.deviceId)) {
-
-
-
- } else {
- // TODO write to overflow
- }
- } catch (Exception e) {
-
- } finally {
- lock.writeLock().unlock();
- }
-
- return result;
- }
/**
* ensure there must be a flush thread submitted after close() is called,