You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/04 15:38:57 UTC
[iotdb] branch xingtanzjr/discard_rpc_new_server created (now de1d165b48)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a change to branch xingtanzjr/discard_rpc_new_server
in repository https://gitbox.apache.org/repos/asf/iotdb.git
at de1d165b48 change state tracker from RPC to method call locally
This branch includes the following new commits:
new de1d165b48 change state tracker from RPC to method call locally
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[iotdb] 01/01: change state tracker from RPC to method call locally
Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/discard_rpc_new_server
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit de1d165b4838fd87c49d330e55b1af6fe3f484c7
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Aug 4 23:30:16 2022 +0800
change state tracker from RPC to method call locally
---
.../scheduler/AbstractFragInsStateTracker.java | 24 +++++++++++++++++-----
1 file changed, 19 insertions(+), 5 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
index 50031dc9b3..eb77ef7587 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
@@ -40,6 +42,8 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
protected QueryStateMachine stateMachine;
protected ScheduledExecutorService scheduledExecutor;
protected List<FragmentInstance> instances;
+ protected final String localhostIpAddr;
+ protected final int localhostInternalPort;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager;
@@ -53,6 +57,8 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
this.scheduledExecutor = scheduledExecutor;
this.instances = instances;
this.internalServiceClientManager = internalServiceClientManager;
+ this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+ this.localhostInternalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort();
}
public abstract void start();
@@ -62,14 +68,22 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
protected FragmentInstanceState fetchState(FragmentInstance instance)
throws TException, IOException {
TEndPoint endPoint = instance.getHostDataNode().internalEndPoint;
- try (SyncDataNodeInternalServiceClient client =
- internalServiceClientManager.borrowClient(endPoint)) {
- TFragmentInstanceStateResp resp =
- client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
- return FragmentInstanceState.valueOf(resp.state);
+ if (isInstanceRunningLocally(endPoint)) {
+ return FragmentInstanceManager.getInstance().getInstanceInfo(instance.getId()).getState();
+ } else {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ TFragmentInstanceStateResp resp =
+ client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
+ return FragmentInstanceState.valueOf(resp.state);
+ }
}
}
+ private boolean isInstanceRunningLocally(TEndPoint endPoint) {
+ return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
+ }
+
private TFragmentInstanceId getTId(FragmentInstance instance) {
return new TFragmentInstanceId(
instance.getId().getQueryId().getId(),