You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/09 01:31:47 UTC

[iotdb] branch master updated: [IOTDB-4880] Fix memory leak in query (#7937)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fe0fa38d8a [IOTDB-4880] Fix memory leak in query (#7937)
fe0fa38d8a is described below

commit fe0fa38d8a03257a00c6035f5a6877908d047df3
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Nov 9 09:31:42 2022 +0800

    [IOTDB-4880] Fix memory leak in query (#7937)
---
 .../fragment/FragmentInstanceExecution.java        |  4 +++
 .../fragment/FragmentInstanceManager.java          | 30 +++++++++++++++++++---
 2 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 7a4841bc45..12e5da9231 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -93,6 +93,10 @@ public class FragmentInstanceExecution {
         stateMachine.getState(), context.getEndTime(), context.getFailedCause());
   }
 
+  public FragmentInstanceStateMachine getStateMachine() {
+    return stateMachine;
+  }
+
   // this is a separate method to ensure that the `this` reference is not leaked during construction
   private void initialize(CounterStat failedInstances, IDriverScheduler scheduler) {
     requireNonNull(failedInstances, "failedInstances is null");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 1d7499560f..9f95052392 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -80,7 +80,7 @@ public class FragmentInstanceManager {
     this.instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
 
-    this.infoCacheTime = new Duration(15, TimeUnit.MINUTES);
+    this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
 
     ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
         instanceManagementExecutor, this::removeOldInstances, 200, 200, TimeUnit.MILLISECONDS);
@@ -134,7 +134,19 @@ public class FragmentInstanceManager {
                 }
               });
 
-      return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
+      if (execution != null) {
+        execution
+            .getStateMachine()
+            .addStateChangeListener(
+                newState -> {
+                  if (newState.isDone()) {
+                    instanceExecution.remove(instanceId);
+                  }
+                });
+        return execution.getInstanceInfo();
+      } else {
+        return createFailedInstanceInfo(instanceId);
+      }
     }
   }
 
@@ -172,7 +184,19 @@ public class FragmentInstanceManager {
                 return null;
               }
             });
-    return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
+    if (execution != null) {
+      execution
+          .getStateMachine()
+          .addStateChangeListener(
+              newState -> {
+                if (newState.isDone()) {
+                  instanceExecution.remove(instanceId);
+                }
+              });
+      return execution.getInstanceInfo();
+    } else {
+      return createFailedInstanceInfo(instanceId);
+    }
   }
 
   /** Aborts a FragmentInstance. keep FragmentInstanceContext for later state tracking */