You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/05 01:00:15 UTC

[iotdb] branch master updated: change state tracker from RPC to method call locally (#6898)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a67962016 change state tracker from RPC to method call locally (#6898)
3a67962016 is described below

commit 3a679620165962a30fc0af63c42368189ab394fd
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Fri Aug 5 09:00:09 2022 +0800

    change state tracker from RPC to method call locally (#6898)
---
 .../scheduler/AbstractFragInsStateTracker.java     | 24 +++++++++++++++++-----
 1 file changed, 19 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
index 50031dc9b3..eb77ef7587 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
@@ -40,6 +42,8 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
   protected QueryStateMachine stateMachine;
   protected ScheduledExecutorService scheduledExecutor;
   protected List<FragmentInstance> instances;
+  protected final String localhostIpAddr;
+  protected final int localhostInternalPort;
 
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       internalServiceClientManager;
@@ -53,6 +57,8 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
     this.scheduledExecutor = scheduledExecutor;
     this.instances = instances;
     this.internalServiceClientManager = internalServiceClientManager;
+    this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+    this.localhostInternalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort();
   }
 
   public abstract void start();
@@ -62,14 +68,22 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
   protected FragmentInstanceState fetchState(FragmentInstance instance)
       throws TException, IOException {
     TEndPoint endPoint = instance.getHostDataNode().internalEndPoint;
-    try (SyncDataNodeInternalServiceClient client =
-        internalServiceClientManager.borrowClient(endPoint)) {
-      TFragmentInstanceStateResp resp =
-          client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
-      return FragmentInstanceState.valueOf(resp.state);
+    if (isInstanceRunningLocally(endPoint)) {
+      return FragmentInstanceManager.getInstance().getInstanceInfo(instance.getId()).getState();
+    } else {
+      try (SyncDataNodeInternalServiceClient client =
+          internalServiceClientManager.borrowClient(endPoint)) {
+        TFragmentInstanceStateResp resp =
+            client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
+        return FragmentInstanceState.valueOf(resp.state);
+      }
     }
   }
 
+  private boolean isInstanceRunningLocally(TEndPoint endPoint) {
+    return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
+  }
+
   private TFragmentInstanceId getTId(FragmentInstance instance) {
     return new TFragmentInstanceId(
         instance.getId().getQueryId().getId(),