You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/25 04:41:50 UTC
[iotdb] 02/02: Fix CI
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch deregisterSourceHandle1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4cf0ad663a629fa92b74ffd23504659d17095393
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Apr 24 20:18:28 2023 +0800
Fix CI
---
.../db/mpp/plan/execution/QueryExecution.java | 27 +++++++++++++++-------
1 file changed, 19 insertions(+), 8 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 8e18cbb6d5..08b0674b1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.execution.QueryState;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.SourceHandle;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
@@ -393,6 +394,23 @@ public class QueryExecution implements IQueryExecution {
// waiting it to be finished.
if (resultHandle != null) {
resultHandle.close();
+ cleanUpResultHandle();
+ }
+ }
+
+ private void cleanUpResultHandle() {
+ // Result handle belongs to special fragment instance, so we need to deregister it alone
+ // We don't need to deal with MemorySourceHandle because it doesn't register to memory pool
+ // We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream
+ // FragmentInstanceId to register
+ if (resultHandle instanceof SourceHandle) {
+ TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId();
+ MPPDataExchangeService.getInstance()
+ .getMPPDataExchangeManager()
+ .deRegisterFragmentInstanceFromMemoryPool(
+ fragmentInstanceId.queryId,
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ fragmentInstanceId));
}
}
@@ -417,14 +435,7 @@ public class QueryExecution implements IQueryExecution {
} else {
resultHandle.close();
}
- // Result handle belongs to special fragment instance, so we need to deregister it alone
- TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId();
- MPPDataExchangeService.getInstance()
- .getMPPDataExchangeManager()
- .deRegisterFragmentInstanceFromMemoryPool(
- fragmentInstanceId.queryId,
- FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
- fragmentInstanceId));
+ cleanUpResultHandle();
}
}