You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/26 13:48:08 UTC

[iotdb] 01/01: set correct InternalEndpoint and DataBlockEndpoint when palnning

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_endpoint_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 279e6c9bba7e38cad29f35f32023c003b2d5e7a9
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Apr 26 21:47:54 2022 +0800

    set correct InternalEndpoint and DataBlockEndpoint when palnning
---
 .../iotdb/db/mpp/common/MPPQueryContext.java       | 25 ++++++++++++++++------
 .../apache/iotdb/db/mpp/execution/Coordinator.java | 19 ++++++++++------
 .../iotdb/db/mpp/execution/QueryExecution.java     |  5 +----
 .../scheduler/AbstractFragInsStateTracker.java     |  6 +-----
 .../scheduler/SimpleFragInstanceDispatcher.java    |  6 +-----
 .../execution/scheduler/SimpleQueryTerminator.java |  8 ++-----
 .../db/mpp/sql/planner/DistributionPlanner.java    |  6 ++++--
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  | 13 +++--------
 .../sql/planner/SimpleFragmentParallelPlanner.java |  9 ++++++--
 .../db/mpp/sql/planner/plan/FragmentInstance.java  | 25 +++++++++++-----------
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 18 ++++++++++------
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |  6 +++++-
 12 files changed, 79 insertions(+), 67 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index da8d42c6de..110946f26c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -31,18 +31,25 @@ public class MPPQueryContext {
   private SessionInfo session;
   private QueryType queryType = QueryType.READ;
 
-  private TEndPoint hostEndpoint;
+  private TEndPoint localDataBlockEndpoint;
+  private TEndPoint localInternalEndpoint;
   private ResultNodeContext resultNodeContext;
 
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
   }
 
-  public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, TEndPoint hostEndpoint) {
+  public MPPQueryContext(
+      String sql,
+      QueryId queryId,
+      SessionInfo session,
+      TEndPoint localDataBlockEndpoint,
+      TEndPoint localInternalEndpoint) {
     this.sql = sql;
     this.queryId = queryId;
     this.session = session;
-    this.hostEndpoint = hostEndpoint;
+    this.localDataBlockEndpoint = localDataBlockEndpoint;
+    this.localInternalEndpoint = localInternalEndpoint;
     this.resultNodeContext = new ResultNodeContext(queryId);
   }
 
@@ -58,11 +65,15 @@ public class MPPQueryContext {
     this.queryType = queryType;
   }
 
-  public TEndPoint getHostEndpoint() {
-    return hostEndpoint;
-  }
-
   public ResultNodeContext getResultNodeContext() {
     return resultNodeContext;
   }
+
+  public TEndPoint getLocalDataBlockEndpoint() {
+    return localDataBlockEndpoint;
+  }
+
+  public TEndPoint getLocalInternalEndpoint() {
+    return localInternalEndpoint;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index a11103df49..97f19d33e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -50,7 +50,12 @@ public class Coordinator {
   private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
   private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1;
 
-  private static final TEndPoint LOCAL_HOST =
+  private static final TEndPoint LOCAL_HOST_DATA_BLOCK_ENDPOINT =
+      new TEndPoint(
+          IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
+          IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
+
+  private static final TEndPoint LOCAL_HOST_INTERNAL_ENDPOINT =
       new TEndPoint(
           IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
           IoTDBDescriptor.getInstance().getConfig().getInternalPort());
@@ -92,7 +97,12 @@ public class Coordinator {
     IQueryExecution execution =
         createQueryExecution(
             statement,
-            new MPPQueryContext(sql, queryId, session, getHostEndpoint()),
+            new MPPQueryContext(
+                sql,
+                queryId,
+                session,
+                LOCAL_HOST_DATA_BLOCK_ENDPOINT,
+                LOCAL_HOST_INTERNAL_ENDPOINT),
             partitionFetcher,
             schemaFetcher);
     queryExecutionMap.put(queryId, execution);
@@ -116,11 +126,6 @@ public class Coordinator {
         COORDINATOR_SCHEDULED_EXECUTOR_SIZE, COORDINATOR_SCHEDULED_EXECUTOR_NAME);
   }
 
-  // Get the hostname of current coordinator
-  private TEndPoint getHostEndpoint() {
-    return LOCAL_HOST;
-  }
-
   public static Coordinator getInstance() {
     return INSTANCE;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 67b4be045c..6602f68382 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
@@ -291,9 +290,7 @@ public class QueryExecution implements IQueryExecution {
               .createSourceHandle(
                   context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
                   context.getResultNodeContext().getVirtualResultNodeId().getId(),
-                  new TEndPoint(
-                      context.getResultNodeContext().getUpStreamEndpoint().getIp(),
-                      IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+                  context.getLocalDataBlockEndpoint(),
                   context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
                   stateMachine::transitionToFailed);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index 71706b4857..b9c5c6dc38 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
@@ -61,9 +59,7 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
     // TODO (jackie tien) change the port
     InternalService.Iface client =
         InternalServiceClientFactory.getInternalServiceClient(
-            new TEndPoint(
-                instance.getHostEndpoint().getIp(),
-                IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+            instance.getHostDataNode().internalEndPoint);
     TFragmentInstanceStateResp resp =
         client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
     return FragmentInstanceState.valueOf(resp.state);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index b2e9252f93..31a1bb6929 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -20,8 +20,6 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
@@ -53,9 +51,7 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
             // TODO: (jackie tien) change the port
             InternalService.Iface client =
                 InternalServiceClientFactory.getInternalServiceClient(
-                    new TEndPoint(
-                        instance.getHostEndpoint().getIp(),
-                        IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+                    instance.getHostDataNode().internalEndPoint);
             // TODO: (xingtanzjr) consider how to handle the buffer here
             ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
             instance.serializeRequest(buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index 856d8d6e6d..b834a01dab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
@@ -58,10 +57,7 @@ public class SimpleQueryTerminator implements IQueryTerminator {
             for (TEndPoint endpoint : relatedHost) {
               // TODO (jackie tien) change the port
               InternalService.Iface client =
-                  InternalServiceClientFactory.getInternalServiceClient(
-                      new TEndPoint(
-                          endpoint.getIp(),
-                          IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+                  InternalServiceClientFactory.getInternalServiceClient(endpoint);
               client.cancelQuery(new TCancelQueryReq(queryId.getId()));
             }
           } catch (TException e) {
@@ -73,7 +69,7 @@ public class SimpleQueryTerminator implements IQueryTerminator {
 
   private List<TEndPoint> getRelatedHost(List<FragmentInstance> instances) {
     return instances.stream()
-        .map(FragmentInstance::getHostEndpoint)
+        .map(instance -> instance.getHostDataNode().internalEndPoint)
         .distinct()
         .collect(Collectors.toList());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 22b571ac3b..513941954e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -120,14 +120,16 @@ public class DistributionPlanner {
 
     FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
     sinkNode.setDownStream(
-        context.getHostEndpoint(),
+        context.getLocalDataBlockEndpoint(),
         context.getResultNodeContext().getVirtualFragmentInstanceId(),
         context.getResultNodeContext().getVirtualResultNodeId());
     sinkNode.setChild(rootInstance.getFragment().getRoot());
     context
         .getResultNodeContext()
         .setUpStream(
-            rootInstance.getHostEndpoint(), rootInstance.getId(), sinkNode.getPlanNodeId());
+            rootInstance.getHostDataNode().dataBlockManagerEndPoint,
+            rootInstance.getId(),
+            sinkNode.getPlanNodeId());
     rootInstance.getFragment().setRoot(sinkNode);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index cfa593146a..70cf5d296a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
@@ -347,15 +345,12 @@ public class LocalExecutionPlanner {
               SeriesScanOperator.class.getSimpleName());
       FragmentInstanceId localInstanceId = context.instanceContext.getId();
       FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId();
-      TEndPoint source = node.getUpstreamEndpoint();
 
       ISourceHandle sourceHandle =
           DATA_BLOCK_MANAGER.createSourceHandle(
               localInstanceId.toThrift(),
               node.getPlanNodeId().getId(),
-              new TEndPoint(
-                  source.getIp(),
-                  IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+              node.getUpstreamEndpoint(),
               remoteInstanceId.toThrift(),
               context.instanceContext::failed);
       return new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
@@ -364,15 +359,13 @@ public class LocalExecutionPlanner {
     @Override
     public Operator visitFragmentSink(FragmentSinkNode node, LocalExecutionPlanContext context) {
       Operator child = node.getChild().accept(this, context);
-      TEndPoint target = node.getDownStreamEndpoint();
+
       FragmentInstanceId localInstanceId = context.instanceContext.getId();
       FragmentInstanceId targetInstanceId = node.getDownStreamInstanceId();
       ISinkHandle sinkHandle =
           DATA_BLOCK_MANAGER.createSinkHandle(
               localInstanceId.toThrift(),
-              new TEndPoint(
-                  target.getIp(),
-                  IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+              node.getDownStreamEndpoint(),
               targetInstanceId.toThrift(),
               node.getDownStreamPlanNodeId().getId(),
               context.instanceContext);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
index 83cffa59cf..b23369479b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
@@ -116,13 +116,18 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
         PlanNodeId downStreamNodeId = sinkNode.getDownStreamPlanNodeId();
         FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId);
         sinkNode.setDownStream(
-            downStreamInstance.getHostEndpoint(), downStreamInstance.getId(), downStreamNodeId);
+            downStreamInstance.getHostDataNode().getDataBlockManagerEndPoint(),
+            downStreamInstance.getId(),
+            downStreamNodeId);
 
         // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance
         PlanNode downStreamExchangeNode =
             downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
         ((ExchangeNode) downStreamExchangeNode)
-            .setUpstream(instance.getHostEndpoint(), instance.getId(), sinkNode.getPlanNodeId());
+            .setUpstream(
+                instance.getHostDataNode().getDataBlockManagerEndPoint(),
+                instance.getId(),
+                sinkNode.getPlanNodeId());
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index b4a7819589..9a95356aca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -44,7 +44,7 @@ public class FragmentInstance implements IConsensusRequest {
   // The Region where the FragmentInstance should run
   private TRegionReplicaSet regionReplicaSet;
 
-  private TEndPoint hostEndpoint;
+  private TDataNodeLocation hostDataNode;
 
   private Filter timeFilter;
 
@@ -67,7 +67,7 @@ public class FragmentInstance implements IConsensusRequest {
     this.regionReplicaSet = regionReplicaSet;
     // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
     // instance
-    this.hostEndpoint = regionReplicaSet.getDataNodeLocations().get(0).getConsensusEndPoint();
+    this.hostDataNode = regionReplicaSet.getDataNodeLocations().get(0);
   }
 
   public TRegionReplicaSet getRegionReplicaSet() {
@@ -78,10 +78,6 @@ public class FragmentInstance implements IConsensusRequest {
     this.regionReplicaSet = regionReplicaSet;
   }
 
-  public TEndPoint getHostEndpoint() {
-    return hostEndpoint;
-  }
-
   public PlanFragment getFragment() {
     return fragment;
   }
@@ -119,7 +115,8 @@ public class FragmentInstance implements IConsensusRequest {
     StringBuilder ret = new StringBuilder();
     ret.append(String.format("FragmentInstance-%s:", getId()));
     ret.append(
-        String.format("Host: %s", getHostEndpoint() == null ? "Not set" : getHostEndpoint()));
+        String.format(
+            "Host: %s", getHostDataNode() == null ? "Not set" : getHostDataNode().dataNodeId));
     ret.append(
         String.format(
             "Region: %s",
@@ -138,7 +135,7 @@ public class FragmentInstance implements IConsensusRequest {
     FragmentInstance fragmentInstance =
         new FragmentInstance(planFragment, id, timeFilter, queryType);
     fragmentInstance.regionReplicaSet = ThriftCommonsSerDeUtils.readTRegionReplicaSet(buffer);
-    fragmentInstance.hostEndpoint = ThriftCommonsSerDeUtils.readTEndPoint(buffer);
+    fragmentInstance.hostDataNode = ThriftCommonsSerDeUtils.readTDataNodeLocation(buffer);
 
     return fragmentInstance;
   }
@@ -153,7 +150,7 @@ public class FragmentInstance implements IConsensusRequest {
     }
     ReadWriteIOUtils.write(type.ordinal(), buffer);
     ThriftCommonsSerDeUtils.writeTRegionReplicaSet(regionReplicaSet, buffer);
-    ThriftCommonsSerDeUtils.writeTEndPoint(hostEndpoint, buffer);
+    ThriftCommonsSerDeUtils.writeTDataNodeLocation(hostDataNode, buffer);
   }
 
   @Override
@@ -165,12 +162,16 @@ public class FragmentInstance implements IConsensusRequest {
         && type == instance.type
         && Objects.equals(fragment, instance.fragment)
         && Objects.equals(regionReplicaSet, instance.regionReplicaSet)
-        && Objects.equals(hostEndpoint, instance.hostEndpoint)
+        && Objects.equals(hostDataNode, instance.hostDataNode)
         && Objects.equals(timeFilter, instance.timeFilter);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(id, type, fragment, regionReplicaSet, hostEndpoint, timeFilter);
+    return Objects.hash(id, type, fragment, regionReplicaSet, hostDataNode, timeFilter);
+  }
+
+  public TDataNodeLocation getHostDataNode() {
+    return hostDataNode;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 8aab414107..f8bfecbda2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -80,7 +80,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     DistributedQueryPlan plan = planner.planFragments();
@@ -100,7 +101,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -257,7 +259,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -294,7 +297,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     DistributedQueryPlan plan = planner.planFragments();
@@ -318,7 +322,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
     context.setQueryType(QueryType.WRITE);
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
@@ -360,7 +365,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
     context.setQueryType(QueryType.WRITE);
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index 7640a45224..1cfd02be15 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -51,7 +51,11 @@ public class QueryPlannerTest {
         new QueryExecution(
             stmt,
             new MPPQueryContext(
-                querySql, new QueryId("query1"), new SessionInfo(), new TEndPoint()),
+                querySql,
+                new QueryId("query1"),
+                new SessionInfo(),
+                new TEndPoint(),
+                new TEndPoint()),
             IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
             IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
             new FakePartitionFetcherImpl(),