You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2023/05/24 02:20:33 UTC

[iotdb] 01/01: Avoid rpc invoking for SimpleQueryTerminator when endpoint is local address

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch opitimize_query_terminator_in_local
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit af7974ef0e44df9bc55105ee469b7813df3c2076
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed May 24 10:20:13 2023 +0800

    Avoid rpc invoking for SimpleQueryTerminator when endpoint is local address
---
 .../mpp/plan/scheduler/SimpleQueryTerminator.java  | 27 ++++++++++++++++++++++
 1 file changed, 27 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 841abf4deae..be1712c6c6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -23,8 +23,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
@@ -101,6 +104,18 @@ public class SimpleQueryTerminator implements IQueryTerminator {
       if (unfinishedFIs.isEmpty()) {
         continue;
       }
+
+      String internalAddress = IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+      int internalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+      if (internalAddress.equalsIgnoreCase(endPoint.getIp())
+          && internalPort == endPoint.getPort()) {
+        for (TFragmentInstanceId insId : unfinishedFIs) {
+          FragmentInstanceManager.getInstance()
+              .cancelTask(FragmentInstanceId.fromThrift(insId), false);
+        }
+        return true;
+      }
+
       try (SyncDataNodeInternalServiceClient client =
           internalServiceClientManager.borrowClient(endPoint)) {
         client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, false));
@@ -126,6 +141,18 @@ public class SimpleQueryTerminator implements IQueryTerminator {
       if (unfinishedFIs.isEmpty()) {
         continue;
       }
+
+      String internalAddress = IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+      int internalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+      if (internalAddress.equalsIgnoreCase(endPoint.getIp())
+          && internalPort == endPoint.getPort()) {
+        for (TFragmentInstanceId insId : unfinishedFIs) {
+          FragmentInstanceManager.getInstance()
+              .cancelTask(FragmentInstanceId.fromThrift(insId), true);
+        }
+        return true;
+      }
+
       try (SyncDataNodeInternalServiceClient client =
           internalServiceClientManager.borrowClient(endPoint)) {
         client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, true));