You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/04 15:38:57 UTC

[iotdb] branch xingtanzjr/discard_rpc_new_server created (now de1d165b48)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/discard_rpc_new_server
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at de1d165b48 change state tracker from RPC to method call locally

This branch includes the following new commits:

     new de1d165b48 change state tracker from RPC to method call locally

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: change state tracker from RPC to method call locally

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/discard_rpc_new_server
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit de1d165b4838fd87c49d330e55b1af6fe3f484c7
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Aug 4 23:30:16 2022 +0800

    change state tracker from RPC to method call locally
---
 .../scheduler/AbstractFragInsStateTracker.java     | 24 +++++++++++++++++-----
 1 file changed, 19 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
index 50031dc9b3..eb77ef7587 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
@@ -40,6 +42,8 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
   protected QueryStateMachine stateMachine;
   protected ScheduledExecutorService scheduledExecutor;
   protected List<FragmentInstance> instances;
+  protected final String localhostIpAddr;
+  protected final int localhostInternalPort;
 
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       internalServiceClientManager;
@@ -53,6 +57,8 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
     this.scheduledExecutor = scheduledExecutor;
     this.instances = instances;
     this.internalServiceClientManager = internalServiceClientManager;
+    this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+    this.localhostInternalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort();
   }
 
   public abstract void start();
@@ -62,14 +68,22 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
   protected FragmentInstanceState fetchState(FragmentInstance instance)
       throws TException, IOException {
     TEndPoint endPoint = instance.getHostDataNode().internalEndPoint;
-    try (SyncDataNodeInternalServiceClient client =
-        internalServiceClientManager.borrowClient(endPoint)) {
-      TFragmentInstanceStateResp resp =
-          client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
-      return FragmentInstanceState.valueOf(resp.state);
+    if (isInstanceRunningLocally(endPoint)) {
+      return FragmentInstanceManager.getInstance().getInstanceInfo(instance.getId()).getState();
+    } else {
+      try (SyncDataNodeInternalServiceClient client =
+          internalServiceClientManager.borrowClient(endPoint)) {
+        TFragmentInstanceStateResp resp =
+            client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
+        return FragmentInstanceState.valueOf(resp.state);
+      }
     }
   }
 
+  private boolean isInstanceRunningLocally(TEndPoint endPoint) {
+    return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
+  }
+
   private TFragmentInstanceId getTId(FragmentInstance instance) {
     return new TFragmentInstanceId(
         instance.getId().getQueryId().getId(),