You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/26 13:48:07 UTC

[iotdb] branch xingtanzjr/fix_endpoint_mpp created (now 279e6c9bba)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/fix_endpoint_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 279e6c9bba set correct InternalEndpoint and DataBlockEndpoint when palnning

This branch includes the following new commits:

     new 279e6c9bba set correct InternalEndpoint and DataBlockEndpoint when palnning

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: set correct InternalEndpoint and DataBlockEndpoint when palnning

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_endpoint_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 279e6c9bba7e38cad29f35f32023c003b2d5e7a9
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Apr 26 21:47:54 2022 +0800

    set correct InternalEndpoint and DataBlockEndpoint when palnning
---
 .../iotdb/db/mpp/common/MPPQueryContext.java       | 25 ++++++++++++++++------
 .../apache/iotdb/db/mpp/execution/Coordinator.java | 19 ++++++++++------
 .../iotdb/db/mpp/execution/QueryExecution.java     |  5 +----
 .../scheduler/AbstractFragInsStateTracker.java     |  6 +-----
 .../scheduler/SimpleFragInstanceDispatcher.java    |  6 +-----
 .../execution/scheduler/SimpleQueryTerminator.java |  8 ++-----
 .../db/mpp/sql/planner/DistributionPlanner.java    |  6 ++++--
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  | 13 +++--------
 .../sql/planner/SimpleFragmentParallelPlanner.java |  9 ++++++--
 .../db/mpp/sql/planner/plan/FragmentInstance.java  | 25 +++++++++++-----------
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 18 ++++++++++------
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |  6 +++++-
 12 files changed, 79 insertions(+), 67 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index da8d42c6de..110946f26c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -31,18 +31,25 @@ public class MPPQueryContext {
   private SessionInfo session;
   private QueryType queryType = QueryType.READ;
 
-  private TEndPoint hostEndpoint;
+  private TEndPoint localDataBlockEndpoint;
+  private TEndPoint localInternalEndpoint;
   private ResultNodeContext resultNodeContext;
 
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
   }
 
-  public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, TEndPoint hostEndpoint) {
+  public MPPQueryContext(
+      String sql,
+      QueryId queryId,
+      SessionInfo session,
+      TEndPoint localDataBlockEndpoint,
+      TEndPoint localInternalEndpoint) {
     this.sql = sql;
     this.queryId = queryId;
     this.session = session;
-    this.hostEndpoint = hostEndpoint;
+    this.localDataBlockEndpoint = localDataBlockEndpoint;
+    this.localInternalEndpoint = localInternalEndpoint;
     this.resultNodeContext = new ResultNodeContext(queryId);
   }
 
@@ -58,11 +65,15 @@ public class MPPQueryContext {
     this.queryType = queryType;
   }
 
-  public TEndPoint getHostEndpoint() {
-    return hostEndpoint;
-  }
-
   public ResultNodeContext getResultNodeContext() {
     return resultNodeContext;
   }
+
+  public TEndPoint getLocalDataBlockEndpoint() {
+    return localDataBlockEndpoint;
+  }
+
+  public TEndPoint getLocalInternalEndpoint() {
+    return localInternalEndpoint;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index a11103df49..97f19d33e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -50,7 +50,12 @@ public class Coordinator {
   private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
   private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1;
 
-  private static final TEndPoint LOCAL_HOST =
+  private static final TEndPoint LOCAL_HOST_DATA_BLOCK_ENDPOINT =
+      new TEndPoint(
+          IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
+          IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
+
+  private static final TEndPoint LOCAL_HOST_INTERNAL_ENDPOINT =
       new TEndPoint(
           IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
           IoTDBDescriptor.getInstance().getConfig().getInternalPort());
@@ -92,7 +97,12 @@ public class Coordinator {
     IQueryExecution execution =
         createQueryExecution(
             statement,
-            new MPPQueryContext(sql, queryId, session, getHostEndpoint()),
+            new MPPQueryContext(
+                sql,
+                queryId,
+                session,
+                LOCAL_HOST_DATA_BLOCK_ENDPOINT,
+                LOCAL_HOST_INTERNAL_ENDPOINT),
             partitionFetcher,
             schemaFetcher);
     queryExecutionMap.put(queryId, execution);
@@ -116,11 +126,6 @@ public class Coordinator {
         COORDINATOR_SCHEDULED_EXECUTOR_SIZE, COORDINATOR_SCHEDULED_EXECUTOR_NAME);
   }
 
-  // Get the hostname of current coordinator
-  private TEndPoint getHostEndpoint() {
-    return LOCAL_HOST;
-  }
-
   public static Coordinator getInstance() {
     return INSTANCE;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 67b4be045c..6602f68382 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
@@ -291,9 +290,7 @@ public class QueryExecution implements IQueryExecution {
               .createSourceHandle(
                   context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
                   context.getResultNodeContext().getVirtualResultNodeId().getId(),
-                  new TEndPoint(
-                      context.getResultNodeContext().getUpStreamEndpoint().getIp(),
-                      IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+                  context.getLocalDataBlockEndpoint(),
                   context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
                   stateMachine::transitionToFailed);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index 71706b4857..b9c5c6dc38 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
@@ -61,9 +59,7 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
     // TODO (jackie tien) change the port
     InternalService.Iface client =
         InternalServiceClientFactory.getInternalServiceClient(
-            new TEndPoint(
-                instance.getHostEndpoint().getIp(),
-                IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+            instance.getHostDataNode().internalEndPoint);
     TFragmentInstanceStateResp resp =
         client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
     return FragmentInstanceState.valueOf(resp.state);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index b2e9252f93..31a1bb6929 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -20,8 +20,6 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
@@ -53,9 +51,7 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
             // TODO: (jackie tien) change the port
             InternalService.Iface client =
                 InternalServiceClientFactory.getInternalServiceClient(
-                    new TEndPoint(
-                        instance.getHostEndpoint().getIp(),
-                        IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+                    instance.getHostDataNode().internalEndPoint);
             // TODO: (xingtanzjr) consider how to handle the buffer here
             ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
             instance.serializeRequest(buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index 856d8d6e6d..b834a01dab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
@@ -58,10 +57,7 @@ public class SimpleQueryTerminator implements IQueryTerminator {
             for (TEndPoint endpoint : relatedHost) {
               // TODO (jackie tien) change the port
               InternalService.Iface client =
-                  InternalServiceClientFactory.getInternalServiceClient(
-                      new TEndPoint(
-                          endpoint.getIp(),
-                          IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+                  InternalServiceClientFactory.getInternalServiceClient(endpoint);
               client.cancelQuery(new TCancelQueryReq(queryId.getId()));
             }
           } catch (TException e) {
@@ -73,7 +69,7 @@ public class SimpleQueryTerminator implements IQueryTerminator {
 
   private List<TEndPoint> getRelatedHost(List<FragmentInstance> instances) {
     return instances.stream()
-        .map(FragmentInstance::getHostEndpoint)
+        .map(instance -> instance.getHostDataNode().internalEndPoint)
         .distinct()
         .collect(Collectors.toList());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 22b571ac3b..513941954e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -120,14 +120,16 @@ public class DistributionPlanner {
 
     FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
     sinkNode.setDownStream(
-        context.getHostEndpoint(),
+        context.getLocalDataBlockEndpoint(),
         context.getResultNodeContext().getVirtualFragmentInstanceId(),
         context.getResultNodeContext().getVirtualResultNodeId());
     sinkNode.setChild(rootInstance.getFragment().getRoot());
     context
         .getResultNodeContext()
         .setUpStream(
-            rootInstance.getHostEndpoint(), rootInstance.getId(), sinkNode.getPlanNodeId());
+            rootInstance.getHostDataNode().dataBlockManagerEndPoint,
+            rootInstance.getId(),
+            sinkNode.getPlanNodeId());
     rootInstance.getFragment().setRoot(sinkNode);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index cfa593146a..70cf5d296a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
@@ -347,15 +345,12 @@ public class LocalExecutionPlanner {
               SeriesScanOperator.class.getSimpleName());
       FragmentInstanceId localInstanceId = context.instanceContext.getId();
       FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId();
-      TEndPoint source = node.getUpstreamEndpoint();
 
       ISourceHandle sourceHandle =
           DATA_BLOCK_MANAGER.createSourceHandle(
               localInstanceId.toThrift(),
               node.getPlanNodeId().getId(),
-              new TEndPoint(
-                  source.getIp(),
-                  IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+              node.getUpstreamEndpoint(),
               remoteInstanceId.toThrift(),
               context.instanceContext::failed);
       return new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
@@ -364,15 +359,13 @@ public class LocalExecutionPlanner {
     @Override
     public Operator visitFragmentSink(FragmentSinkNode node, LocalExecutionPlanContext context) {
       Operator child = node.getChild().accept(this, context);
-      TEndPoint target = node.getDownStreamEndpoint();
+
       FragmentInstanceId localInstanceId = context.instanceContext.getId();
       FragmentInstanceId targetInstanceId = node.getDownStreamInstanceId();
       ISinkHandle sinkHandle =
           DATA_BLOCK_MANAGER.createSinkHandle(
               localInstanceId.toThrift(),
-              new TEndPoint(
-                  target.getIp(),
-                  IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+              node.getDownStreamEndpoint(),
               targetInstanceId.toThrift(),
               node.getDownStreamPlanNodeId().getId(),
               context.instanceContext);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
index 83cffa59cf..b23369479b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
@@ -116,13 +116,18 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
         PlanNodeId downStreamNodeId = sinkNode.getDownStreamPlanNodeId();
         FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId);
         sinkNode.setDownStream(
-            downStreamInstance.getHostEndpoint(), downStreamInstance.getId(), downStreamNodeId);
+            downStreamInstance.getHostDataNode().getDataBlockManagerEndPoint(),
+            downStreamInstance.getId(),
+            downStreamNodeId);
 
         // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance
         PlanNode downStreamExchangeNode =
             downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
         ((ExchangeNode) downStreamExchangeNode)
-            .setUpstream(instance.getHostEndpoint(), instance.getId(), sinkNode.getPlanNodeId());
+            .setUpstream(
+                instance.getHostDataNode().getDataBlockManagerEndPoint(),
+                instance.getId(),
+                sinkNode.getPlanNodeId());
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index b4a7819589..9a95356aca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -44,7 +44,7 @@ public class FragmentInstance implements IConsensusRequest {
   // The Region where the FragmentInstance should run
   private TRegionReplicaSet regionReplicaSet;
 
-  private TEndPoint hostEndpoint;
+  private TDataNodeLocation hostDataNode;
 
   private Filter timeFilter;
 
@@ -67,7 +67,7 @@ public class FragmentInstance implements IConsensusRequest {
     this.regionReplicaSet = regionReplicaSet;
     // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
     // instance
-    this.hostEndpoint = regionReplicaSet.getDataNodeLocations().get(0).getConsensusEndPoint();
+    this.hostDataNode = regionReplicaSet.getDataNodeLocations().get(0);
   }
 
   public TRegionReplicaSet getRegionReplicaSet() {
@@ -78,10 +78,6 @@ public class FragmentInstance implements IConsensusRequest {
     this.regionReplicaSet = regionReplicaSet;
   }
 
-  public TEndPoint getHostEndpoint() {
-    return hostEndpoint;
-  }
-
   public PlanFragment getFragment() {
     return fragment;
   }
@@ -119,7 +115,8 @@ public class FragmentInstance implements IConsensusRequest {
     StringBuilder ret = new StringBuilder();
     ret.append(String.format("FragmentInstance-%s:", getId()));
     ret.append(
-        String.format("Host: %s", getHostEndpoint() == null ? "Not set" : getHostEndpoint()));
+        String.format(
+            "Host: %s", getHostDataNode() == null ? "Not set" : getHostDataNode().dataNodeId));
     ret.append(
         String.format(
             "Region: %s",
@@ -138,7 +135,7 @@ public class FragmentInstance implements IConsensusRequest {
     FragmentInstance fragmentInstance =
         new FragmentInstance(planFragment, id, timeFilter, queryType);
     fragmentInstance.regionReplicaSet = ThriftCommonsSerDeUtils.readTRegionReplicaSet(buffer);
-    fragmentInstance.hostEndpoint = ThriftCommonsSerDeUtils.readTEndPoint(buffer);
+    fragmentInstance.hostDataNode = ThriftCommonsSerDeUtils.readTDataNodeLocation(buffer);
 
     return fragmentInstance;
   }
@@ -153,7 +150,7 @@ public class FragmentInstance implements IConsensusRequest {
     }
     ReadWriteIOUtils.write(type.ordinal(), buffer);
     ThriftCommonsSerDeUtils.writeTRegionReplicaSet(regionReplicaSet, buffer);
-    ThriftCommonsSerDeUtils.writeTEndPoint(hostEndpoint, buffer);
+    ThriftCommonsSerDeUtils.writeTDataNodeLocation(hostDataNode, buffer);
   }
 
   @Override
@@ -165,12 +162,16 @@ public class FragmentInstance implements IConsensusRequest {
         && type == instance.type
         && Objects.equals(fragment, instance.fragment)
         && Objects.equals(regionReplicaSet, instance.regionReplicaSet)
-        && Objects.equals(hostEndpoint, instance.hostEndpoint)
+        && Objects.equals(hostDataNode, instance.hostDataNode)
         && Objects.equals(timeFilter, instance.timeFilter);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(id, type, fragment, regionReplicaSet, hostEndpoint, timeFilter);
+    return Objects.hash(id, type, fragment, regionReplicaSet, hostDataNode, timeFilter);
+  }
+
+  public TDataNodeLocation getHostDataNode() {
+    return hostDataNode;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 8aab414107..f8bfecbda2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -80,7 +80,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     DistributedQueryPlan plan = planner.planFragments();
@@ -100,7 +101,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -257,7 +259,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -294,7 +297,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     DistributedQueryPlan plan = planner.planFragments();
@@ -318,7 +322,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
     context.setQueryType(QueryType.WRITE);
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
@@ -360,7 +365,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
     context.setQueryType(QueryType.WRITE);
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index 7640a45224..1cfd02be15 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -51,7 +51,11 @@ public class QueryPlannerTest {
         new QueryExecution(
             stmt,
             new MPPQueryContext(
-                querySql, new QueryId("query1"), new SessionInfo(), new TEndPoint()),
+                querySql,
+                new QueryId("query1"),
+                new SessionInfo(),
+                new TEndPoint(),
+                new TEndPoint()),
             IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
             IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
             new FakePartitionFetcherImpl(),