You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/08 12:14:28 UTC
[iotdb] 01/01: Fix memory leak in query
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch QueryMemoryLeaky
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4696cfd24d23dd3dcc8ba17b3a630a8855db1a01
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Nov 8 20:07:47 2022 +0800
Fix memory leak in query
---
.../fragment/FragmentInstanceExecution.java | 4 +++
.../fragment/FragmentInstanceManager.java | 30 +++++++++++++++++++---
2 files changed, 31 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 7a4841bc45..12e5da9231 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -93,6 +93,10 @@ public class FragmentInstanceExecution {
stateMachine.getState(), context.getEndTime(), context.getFailedCause());
}
+ public FragmentInstanceStateMachine getStateMachine() {
+ return stateMachine;
+ }
+
// this is a separate method to ensure that the `this` reference is not leaked during construction
private void initialize(CounterStat failedInstances, IDriverScheduler scheduler) {
requireNonNull(failedInstances, "failedInstances is null");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 1d7499560f..9f95052392 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -80,7 +80,7 @@ public class FragmentInstanceManager {
this.instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
- this.infoCacheTime = new Duration(15, TimeUnit.MINUTES);
+ this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
instanceManagementExecutor, this::removeOldInstances, 200, 200, TimeUnit.MILLISECONDS);
@@ -134,7 +134,19 @@ public class FragmentInstanceManager {
}
});
- return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
+ if (execution != null) {
+ execution
+ .getStateMachine()
+ .addStateChangeListener(
+ newState -> {
+ if (newState.isDone()) {
+ instanceExecution.remove(instanceId);
+ }
+ });
+ return execution.getInstanceInfo();
+ } else {
+ return createFailedInstanceInfo(instanceId);
+ }
}
}
@@ -172,7 +184,19 @@ public class FragmentInstanceManager {
return null;
}
});
- return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
+ if (execution != null) {
+ execution
+ .getStateMachine()
+ .addStateChangeListener(
+ newState -> {
+ if (newState.isDone()) {
+ instanceExecution.remove(instanceId);
+ }
+ });
+ return execution.getInstanceInfo();
+ } else {
+ return createFailedInstanceInfo(instanceId);
+ }
}
/** Aborts a FragmentInstance. keep FragmentInstanceContext for later state tracking */