You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/25 04:41:50 UTC

[iotdb] 02/02: Fix CI

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch deregisterSourceHandle1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4cf0ad663a629fa92b74ffd23504659d17095393
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Apr 24 20:18:28 2023 +0800

    Fix CI
---
 .../db/mpp/plan/execution/QueryExecution.java      | 27 +++++++++++++++-------
 1 file changed, 19 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 8e18cbb6d5..08b0674b1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.execution.QueryState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
 import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.SourceHandle;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
@@ -393,6 +394,23 @@ public class QueryExecution implements IQueryExecution {
     // waiting it to be finished.
     if (resultHandle != null) {
       resultHandle.close();
+      cleanUpResultHandle();
+    }
+  }
+
+  private void cleanUpResultHandle() {
+    // Result handle belongs to special fragment instance, so we need to deregister it alone
+    // We don't need to deal with MemorySourceHandle because it doesn't register to memory pool
+    // We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream
+    // FragmentInstanceId to register
+    if (resultHandle instanceof SourceHandle) {
+      TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId();
+      MPPDataExchangeService.getInstance()
+          .getMPPDataExchangeManager()
+          .deRegisterFragmentInstanceFromMemoryPool(
+              fragmentInstanceId.queryId,
+              FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                  fragmentInstanceId));
     }
   }
 
@@ -417,14 +435,7 @@ public class QueryExecution implements IQueryExecution {
       } else {
         resultHandle.close();
       }
-      // Result handle belongs to special fragment instance, so we need to deregister it alone
-      TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId();
-      MPPDataExchangeService.getInstance()
-          .getMPPDataExchangeManager()
-          .deRegisterFragmentInstanceFromMemoryPool(
-              fragmentInstanceId.queryId,
-              FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
-                  fragmentInstanceId));
+      cleanUpResultHandle();
     }
   }