You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/11/06 09:30:10 UTC
[iotdb] branch fix_mem_control_2 updated: fix cannot close file
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch fix_mem_control_2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/fix_mem_control_2 by this push:
new 9739a0f fix cannot close file
9739a0f is described below
commit 9739a0f5251699d992bd9a748931146d6fc81ee6
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Nov 6 17:29:14 2020 +0800
fix cannot close file
---
.../iotdb/db/engine/storagegroup/StorageGroupInfo.java | 14 +++++++-------
.../db/engine/storagegroup/StorageGroupProcessor.java | 13 ++++++++-----
.../iotdb/db/engine/storagegroup/TsFileProcessor.java | 15 ++++++++++-----
.../main/java/org/apache/iotdb/db/rescon/SystemInfo.java | 6 ++++++
4 files changed, 31 insertions(+), 17 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index e696a60..a31d41a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -44,12 +44,12 @@ public class StorageGroupInfo {
private long storageGroupSizeReportThreshold =
IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold();
- private long lastReportedSize = 0L;
+ private AtomicLong lastReportedSize = new AtomicLong();
/**
* A set of all unclosed TsFileProcessors in this SG
*/
- private Set<TsFileProcessor> reportedTsps = new HashSet<>();
+ private List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>();
public StorageGroupInfo(StorageGroupProcessor storageGroupProcessor) {
this.storageGroupProcessor = storageGroupProcessor;
@@ -81,16 +81,16 @@ public class StorageGroupInfo {
return memoryCost.get();
}
- public Set<TsFileProcessor> getAllReportedTsp() {
+ public List<TsFileProcessor> getAllReportedTsp() {
return reportedTsps;
}
public boolean needToReportToSystem() {
- return memoryCost.get() - lastReportedSize > storageGroupSizeReportThreshold;
+ return memoryCost.get() - lastReportedSize.get() > storageGroupSizeReportThreshold;
}
public void setLastReportedSize(long size) {
- lastReportedSize = size;
+ lastReportedSize.set(size);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 17d09d6..be56a73 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -829,6 +829,10 @@ public class StorageGroupProcessor {
.put(insertTabletPlan.getDeviceId().getFullPath(), insertTabletPlan.getTimes()[end - 1]);
}
+ if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
+ closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+ return true;
+ }
// check memtable size and may async try to flush the work memtable
if (tsFileProcessor.shouldFlush()) {
fileFlushPolicy.apply(this, tsFileProcessor, sequence);
@@ -914,13 +918,12 @@ public class StorageGroupProcessor {
}
public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) {
- if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
- closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
- return;
- }
writeLock();
try {
- fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+ if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) &&
+ !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+ fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+ }
} finally {
writeUnlock();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 5e5f083..7ba02eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -493,15 +493,15 @@ public class TsFileProcessor {
}
try {
- if (logger.isInfoEnabled()) {
+ if (logger.isDebugEnabled()) {
if (workMemTable != null) {
- logger.info(
+ logger.debug(
"{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile size: {}",
storageGroupName, tsFileResource.getTsFile().getAbsolutePath(),
workMemTable.memSize(),
tsFileResource.getTsFileSize());
} else {
- logger.info("{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}",
+ logger.debug("{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}",
storageGroupName, tsFileResource.getTsFile().getAbsolutePath(),
tsFileResource.getTsFileSize());
}
@@ -676,13 +676,18 @@ public class TsFileProcessor {
}
memTable.release();
if (enableMemControl) {
- // For text type data, reset the mem cost in tsFileProcessorInfo
+ // reset the mem cost in StorageGroupProcessorInfo
storageGroupInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
+ if (logger.isDebugEnabled()) {
+ logger.debug("[mem control] {}: {} flush finished, try to reset system memcost, "
+ + "flushing memtable list size: {}", storageGroupName,
+ tsFileResource.getTsFile().getName(), flushingMemTables.size());
+ }
// report to System
SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, true);
}
if (logger.isDebugEnabled()) {
- logger.debug("{}: {} flush finished, remove a memtable from flushing list, "
+ logger.debug("[mem_control] {}: {} flush finished, remove a memtable from flushing list, "
+ "flushing memtable list size: {}", storageGroupName,
tsFileResource.getTsFile().getName(), flushingMemTables.size());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index aa32467..74c00f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -147,7 +147,13 @@ public class SystemInfo {
* Be Careful!! This method can only be called by flush thread!
*/
private void forceAsyncFlush() {
+ if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
+ return;
+ }
List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
+ if (logger.isDebugEnabled()) {
+ logger.debug("[mem control] get {} tsp to flush", processors.size());
+ }
for (TsFileProcessor processor : processors) {
if (processor != null) {
processor.startAsyncFlush();