You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/25 04:41:48 UTC

[iotdb] branch deregisterSourceHandle1.1 created (now 4cf0ad663a)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch deregisterSourceHandle1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 4cf0ad663a Fix CI

This branch includes the following new commits:

     new 38b6252376 deregister result handle
     new 4cf0ad663a Fix CI

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/02: deregister result handle

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch deregisterSourceHandle1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 38b6252376b297f8926cd3578f226ef6b3c2c521
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon Apr 24 16:22:06 2023 +0800

    deregister result handle
---
 .../org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 94fb69067e..8e18cbb6d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.KilledByOthersException;
 import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.QueryState;
@@ -62,6 +63,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -415,6 +417,14 @@ public class QueryExecution implements IQueryExecution {
       } else {
         resultHandle.close();
       }
+      // Result handle belongs to special fragment instance, so we need to deregister it alone
+      TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId();
+      MPPDataExchangeService.getInstance()
+          .getMPPDataExchangeManager()
+          .deRegisterFragmentInstanceFromMemoryPool(
+              fragmentInstanceId.queryId,
+              FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                  fragmentInstanceId));
     }
   }
 


[iotdb] 02/02: Fix CI

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch deregisterSourceHandle1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4cf0ad663a629fa92b74ffd23504659d17095393
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Apr 24 20:18:28 2023 +0800

    Fix CI
---
 .../db/mpp/plan/execution/QueryExecution.java      | 27 +++++++++++++++-------
 1 file changed, 19 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 8e18cbb6d5..08b0674b1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.execution.QueryState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
 import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.SourceHandle;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
@@ -393,6 +394,23 @@ public class QueryExecution implements IQueryExecution {
     // waiting it to be finished.
     if (resultHandle != null) {
       resultHandle.close();
+      cleanUpResultHandle();
+    }
+  }
+
+  private void cleanUpResultHandle() {
+    // Result handle belongs to special fragment instance, so we need to deregister it alone
+    // We don't need to deal with MemorySourceHandle because it doesn't register to memory pool
+    // We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream
+    // FragmentInstanceId to register
+    if (resultHandle instanceof SourceHandle) {
+      TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId();
+      MPPDataExchangeService.getInstance()
+          .getMPPDataExchangeManager()
+          .deRegisterFragmentInstanceFromMemoryPool(
+              fragmentInstanceId.queryId,
+              FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                  fragmentInstanceId));
     }
   }
 
@@ -417,14 +435,7 @@ public class QueryExecution implements IQueryExecution {
       } else {
         resultHandle.close();
       }
-      // Result handle belongs to special fragment instance, so we need to deregister it alone
-      TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId();
-      MPPDataExchangeService.getInstance()
-          .getMPPDataExchangeManager()
-          .deRegisterFragmentInstanceFromMemoryPool(
-              fragmentInstanceId.queryId,
-              FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
-                  fragmentInstanceId));
+      cleanUpResultHandle();
     }
   }