You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/05 01:00:15 UTC
[iotdb] branch master updated: change state tracker from RPC to method call locally (#6898)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3a67962016 change state tracker from RPC to method call locally (#6898)
3a67962016 is described below
commit 3a679620165962a30fc0af63c42368189ab394fd
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Fri Aug 5 09:00:09 2022 +0800
change state tracker from RPC to method call locally (#6898)
---
.../scheduler/AbstractFragInsStateTracker.java | 24 +++++++++++++++++-----
1 file changed, 19 insertions(+), 5 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
index 50031dc9b3..eb77ef7587 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
@@ -40,6 +42,8 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
protected QueryStateMachine stateMachine;
protected ScheduledExecutorService scheduledExecutor;
protected List<FragmentInstance> instances;
+ protected final String localhostIpAddr;
+ protected final int localhostInternalPort;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager;
@@ -53,6 +57,8 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
this.scheduledExecutor = scheduledExecutor;
this.instances = instances;
this.internalServiceClientManager = internalServiceClientManager;
+ this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+ this.localhostInternalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort();
}
public abstract void start();
@@ -62,14 +68,22 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
protected FragmentInstanceState fetchState(FragmentInstance instance)
throws TException, IOException {
TEndPoint endPoint = instance.getHostDataNode().internalEndPoint;
- try (SyncDataNodeInternalServiceClient client =
- internalServiceClientManager.borrowClient(endPoint)) {
- TFragmentInstanceStateResp resp =
- client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
- return FragmentInstanceState.valueOf(resp.state);
+ if (isInstanceRunningLocally(endPoint)) {
+ return FragmentInstanceManager.getInstance().getInstanceInfo(instance.getId()).getState();
+ } else {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ TFragmentInstanceStateResp resp =
+ client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
+ return FragmentInstanceState.valueOf(resp.state);
+ }
}
}
+ private boolean isInstanceRunningLocally(TEndPoint endPoint) {
+ return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
+ }
+
private TFragmentInstanceId getTId(FragmentInstance instance) {
return new TFragmentInstanceId(
instance.getId().getQueryId().getId(),