You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/11/06 03:11:54 UTC
[iotdb] 01/01: fix cannot flush when system reject
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch mcbf
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4e63e435da80bda56bb61ab66e709b71cf9a06e2
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Nov 6 00:36:08 2020 +0800
fix cannot flush when system reject
---
.../org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java | 8 --------
.../iotdb/db/engine/storagegroup/StorageGroupProcessor.java | 10 ++++++++++
.../apache/iotdb/db/engine/storagegroup/TsFileProcessor.java | 2 ++
3 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
index f2fb524..d760819 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.engine.flush;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* TsFileFlushPolicy is applied when a TsFileProcessor is full after insertion. For standalone
@@ -35,19 +33,13 @@ public interface TsFileFlushPolicy {
class DirectFlushPolicy implements TsFileFlushPolicy {
- private static final Logger logger = LoggerFactory.getLogger(DirectFlushPolicy.class);
-
@Override
public void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor tsFileProcessor,
boolean isSeq) {
if (tsFileProcessor.shouldClose()) {
storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
- logger.info("Async close tsfile: {}",
- tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
} else {
tsFileProcessor.asyncFlush();
- logger.info("Async flush a memtable to tsfile: {}",
- tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index e869143..17d09d6 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -914,6 +914,10 @@ public class StorageGroupProcessor {
}
public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) {
+ if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
+ closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+ return;
+ }
writeLock();
try {
fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
@@ -1089,6 +1093,12 @@ public class StorageGroupProcessor {
public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
//for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
//for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
+ if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
+ closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+ return;
+ }
+ logger.info("Async close tsfile: {}",
+ tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
if (sequence) {
closingSequenceTsFileProcessor.add(tsFileProcessor);
updateEndTimeMap(tsFileProcessor);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 5e26f0b..5e5f083 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -602,6 +602,8 @@ public class TsFileProcessor {
if (workMemTable == null) {
return;
}
+ logger.info("Async flush a memtable to tsfile: {}",
+ tsFileResource.getTsFile().getAbsolutePath());
addAMemtableIntoFlushingList(workMemTable);
} catch (Exception e) {
logger.error("{}: {} add a memtable into flushing list failed", storageGroupName,