You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/09 01:31:47 UTC
[iotdb] branch master updated: [IOTDB-4880] Fix memory leak in query (#7937)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fe0fa38d8a [IOTDB-4880] Fix memory leak in query (#7937)
fe0fa38d8a is described below
commit fe0fa38d8a03257a00c6035f5a6877908d047df3
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Nov 9 09:31:42 2022 +0800
[IOTDB-4880] Fix memory leak in query (#7937)
---
.../fragment/FragmentInstanceExecution.java | 4 +++
.../fragment/FragmentInstanceManager.java | 30 +++++++++++++++++++---
2 files changed, 31 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 7a4841bc45..12e5da9231 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -93,6 +93,10 @@ public class FragmentInstanceExecution {
stateMachine.getState(), context.getEndTime(), context.getFailedCause());
}
+ public FragmentInstanceStateMachine getStateMachine() {
+ return stateMachine;
+ }
+
// this is a separate method to ensure that the `this` reference is not leaked during construction
private void initialize(CounterStat failedInstances, IDriverScheduler scheduler) {
requireNonNull(failedInstances, "failedInstances is null");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 1d7499560f..9f95052392 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -80,7 +80,7 @@ public class FragmentInstanceManager {
this.instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
- this.infoCacheTime = new Duration(15, TimeUnit.MINUTES);
+ this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
instanceManagementExecutor, this::removeOldInstances, 200, 200, TimeUnit.MILLISECONDS);
@@ -134,7 +134,19 @@ public class FragmentInstanceManager {
}
});
- return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
+ if (execution != null) {
+ execution
+ .getStateMachine()
+ .addStateChangeListener(
+ newState -> {
+ if (newState.isDone()) {
+ instanceExecution.remove(instanceId);
+ }
+ });
+ return execution.getInstanceInfo();
+ } else {
+ return createFailedInstanceInfo(instanceId);
+ }
}
}
@@ -172,7 +184,19 @@ public class FragmentInstanceManager {
return null;
}
});
- return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
+ if (execution != null) {
+ execution
+ .getStateMachine()
+ .addStateChangeListener(
+ newState -> {
+ if (newState.isDone()) {
+ instanceExecution.remove(instanceId);
+ }
+ });
+ return execution.getInstanceInfo();
+ } else {
+ return createFailedInstanceInfo(instanceId);
+ }
}
/** Aborts a FragmentInstance. keep FragmentInstanceContext for later state tracking */