You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/26 13:48:08 UTC
[iotdb] 01/01: set correct InternalEndpoint and DataBlockEndpoint when palnning
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/fix_endpoint_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 279e6c9bba7e38cad29f35f32023c003b2d5e7a9
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Apr 26 21:47:54 2022 +0800
set correct InternalEndpoint and DataBlockEndpoint when palnning
---
.../iotdb/db/mpp/common/MPPQueryContext.java | 25 ++++++++++++++++------
.../apache/iotdb/db/mpp/execution/Coordinator.java | 19 ++++++++++------
.../iotdb/db/mpp/execution/QueryExecution.java | 5 +----
.../scheduler/AbstractFragInsStateTracker.java | 6 +-----
.../scheduler/SimpleFragInstanceDispatcher.java | 6 +-----
.../execution/scheduler/SimpleQueryTerminator.java | 8 ++-----
.../db/mpp/sql/planner/DistributionPlanner.java | 6 ++++--
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 13 +++--------
.../sql/planner/SimpleFragmentParallelPlanner.java | 9 ++++++--
.../db/mpp/sql/planner/plan/FragmentInstance.java | 25 +++++++++++-----------
.../db/mpp/sql/plan/DistributionPlannerTest.java | 18 ++++++++++------
.../iotdb/db/mpp/sql/plan/QueryPlannerTest.java | 6 +++++-
12 files changed, 79 insertions(+), 67 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index da8d42c6de..110946f26c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -31,18 +31,25 @@ public class MPPQueryContext {
private SessionInfo session;
private QueryType queryType = QueryType.READ;
- private TEndPoint hostEndpoint;
+ private TEndPoint localDataBlockEndpoint;
+ private TEndPoint localInternalEndpoint;
private ResultNodeContext resultNodeContext;
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
}
- public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, TEndPoint hostEndpoint) {
+ public MPPQueryContext(
+ String sql,
+ QueryId queryId,
+ SessionInfo session,
+ TEndPoint localDataBlockEndpoint,
+ TEndPoint localInternalEndpoint) {
this.sql = sql;
this.queryId = queryId;
this.session = session;
- this.hostEndpoint = hostEndpoint;
+ this.localDataBlockEndpoint = localDataBlockEndpoint;
+ this.localInternalEndpoint = localInternalEndpoint;
this.resultNodeContext = new ResultNodeContext(queryId);
}
@@ -58,11 +65,15 @@ public class MPPQueryContext {
this.queryType = queryType;
}
- public TEndPoint getHostEndpoint() {
- return hostEndpoint;
- }
-
public ResultNodeContext getResultNodeContext() {
return resultNodeContext;
}
+
+ public TEndPoint getLocalDataBlockEndpoint() {
+ return localDataBlockEndpoint;
+ }
+
+ public TEndPoint getLocalInternalEndpoint() {
+ return localInternalEndpoint;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index a11103df49..97f19d33e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -50,7 +50,12 @@ public class Coordinator {
private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1;
- private static final TEndPoint LOCAL_HOST =
+ private static final TEndPoint LOCAL_HOST_DATA_BLOCK_ENDPOINT =
+ new TEndPoint(
+ IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
+ IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
+
+ private static final TEndPoint LOCAL_HOST_INTERNAL_ENDPOINT =
new TEndPoint(
IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
IoTDBDescriptor.getInstance().getConfig().getInternalPort());
@@ -92,7 +97,12 @@ public class Coordinator {
IQueryExecution execution =
createQueryExecution(
statement,
- new MPPQueryContext(sql, queryId, session, getHostEndpoint()),
+ new MPPQueryContext(
+ sql,
+ queryId,
+ session,
+ LOCAL_HOST_DATA_BLOCK_ENDPOINT,
+ LOCAL_HOST_INTERNAL_ENDPOINT),
partitionFetcher,
schemaFetcher);
queryExecutionMap.put(queryId, execution);
@@ -116,11 +126,6 @@ public class Coordinator {
COORDINATOR_SCHEDULED_EXECUTOR_SIZE, COORDINATOR_SCHEDULED_EXECUTOR_NAME);
}
- // Get the hostname of current coordinator
- private TEndPoint getHostEndpoint() {
- return LOCAL_HOST;
- }
-
public static Coordinator getInstance() {
return INSTANCE;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 67b4be045c..6602f68382 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.buffer.DataBlockService;
@@ -291,9 +290,7 @@ public class QueryExecution implements IQueryExecution {
.createSourceHandle(
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
context.getResultNodeContext().getVirtualResultNodeId().getId(),
- new TEndPoint(
- context.getResultNodeContext().getUpStreamEndpoint().getIp(),
- IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+ context.getLocalDataBlockEndpoint(),
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
stateMachine::transitionToFailed);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index 71706b4857..b9c5c6dc38 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
@@ -61,9 +59,7 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
// TODO (jackie tien) change the port
InternalService.Iface client =
InternalServiceClientFactory.getInternalServiceClient(
- new TEndPoint(
- instance.getHostEndpoint().getIp(),
- IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+ instance.getHostDataNode().internalEndPoint);
TFragmentInstanceStateResp resp =
client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
return FragmentInstanceState.valueOf(resp.state);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index b2e9252f93..31a1bb6929 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -20,8 +20,6 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
@@ -53,9 +51,7 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
// TODO: (jackie tien) change the port
InternalService.Iface client =
InternalServiceClientFactory.getInternalServiceClient(
- new TEndPoint(
- instance.getHostEndpoint().getIp(),
- IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+ instance.getHostDataNode().internalEndPoint);
// TODO: (xingtanzjr) consider how to handle the buffer here
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
instance.serializeRequest(buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index 856d8d6e6d..b834a01dab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
@@ -58,10 +57,7 @@ public class SimpleQueryTerminator implements IQueryTerminator {
for (TEndPoint endpoint : relatedHost) {
// TODO (jackie tien) change the port
InternalService.Iface client =
- InternalServiceClientFactory.getInternalServiceClient(
- new TEndPoint(
- endpoint.getIp(),
- IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+ InternalServiceClientFactory.getInternalServiceClient(endpoint);
client.cancelQuery(new TCancelQueryReq(queryId.getId()));
}
} catch (TException e) {
@@ -73,7 +69,7 @@ public class SimpleQueryTerminator implements IQueryTerminator {
private List<TEndPoint> getRelatedHost(List<FragmentInstance> instances) {
return instances.stream()
- .map(FragmentInstance::getHostEndpoint)
+ .map(instance -> instance.getHostDataNode().internalEndPoint)
.distinct()
.collect(Collectors.toList());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 22b571ac3b..513941954e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -120,14 +120,16 @@ public class DistributionPlanner {
FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
sinkNode.setDownStream(
- context.getHostEndpoint(),
+ context.getLocalDataBlockEndpoint(),
context.getResultNodeContext().getVirtualFragmentInstanceId(),
context.getResultNodeContext().getVirtualResultNodeId());
sinkNode.setChild(rootInstance.getFragment().getRoot());
context
.getResultNodeContext()
.setUpStream(
- rootInstance.getHostEndpoint(), rootInstance.getId(), sinkNode.getPlanNodeId());
+ rootInstance.getHostDataNode().dataBlockManagerEndPoint,
+ rootInstance.getId(),
+ sinkNode.getPlanNodeId());
rootInstance.getFragment().setRoot(sinkNode);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index cfa593146a..70cf5d296a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -18,8 +18,6 @@
*/
package org.apache.iotdb.db.mpp.sql.planner;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
@@ -347,15 +345,12 @@ public class LocalExecutionPlanner {
SeriesScanOperator.class.getSimpleName());
FragmentInstanceId localInstanceId = context.instanceContext.getId();
FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId();
- TEndPoint source = node.getUpstreamEndpoint();
ISourceHandle sourceHandle =
DATA_BLOCK_MANAGER.createSourceHandle(
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
- new TEndPoint(
- source.getIp(),
- IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+ node.getUpstreamEndpoint(),
remoteInstanceId.toThrift(),
context.instanceContext::failed);
return new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
@@ -364,15 +359,13 @@ public class LocalExecutionPlanner {
@Override
public Operator visitFragmentSink(FragmentSinkNode node, LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
- TEndPoint target = node.getDownStreamEndpoint();
+
FragmentInstanceId localInstanceId = context.instanceContext.getId();
FragmentInstanceId targetInstanceId = node.getDownStreamInstanceId();
ISinkHandle sinkHandle =
DATA_BLOCK_MANAGER.createSinkHandle(
localInstanceId.toThrift(),
- new TEndPoint(
- target.getIp(),
- IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+ node.getDownStreamEndpoint(),
targetInstanceId.toThrift(),
node.getDownStreamPlanNodeId().getId(),
context.instanceContext);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
index 83cffa59cf..b23369479b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
@@ -116,13 +116,18 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
PlanNodeId downStreamNodeId = sinkNode.getDownStreamPlanNodeId();
FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId);
sinkNode.setDownStream(
- downStreamInstance.getHostEndpoint(), downStreamInstance.getId(), downStreamNodeId);
+ downStreamInstance.getHostDataNode().getDataBlockManagerEndPoint(),
+ downStreamInstance.getId(),
+ downStreamNodeId);
// Set upstream info for corresponding ExchangeNode in downstream FragmentInstance
PlanNode downStreamExchangeNode =
downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
((ExchangeNode) downStreamExchangeNode)
- .setUpstream(instance.getHostEndpoint(), instance.getId(), sinkNode.getPlanNodeId());
+ .setUpstream(
+ instance.getHostDataNode().getDataBlockManagerEndPoint(),
+ instance.getId(),
+ sinkNode.getPlanNodeId());
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index b4a7819589..9a95356aca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -44,7 +44,7 @@ public class FragmentInstance implements IConsensusRequest {
// The Region where the FragmentInstance should run
private TRegionReplicaSet regionReplicaSet;
- private TEndPoint hostEndpoint;
+ private TDataNodeLocation hostDataNode;
private Filter timeFilter;
@@ -67,7 +67,7 @@ public class FragmentInstance implements IConsensusRequest {
this.regionReplicaSet = regionReplicaSet;
// TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
// instance
- this.hostEndpoint = regionReplicaSet.getDataNodeLocations().get(0).getConsensusEndPoint();
+ this.hostDataNode = regionReplicaSet.getDataNodeLocations().get(0);
}
public TRegionReplicaSet getRegionReplicaSet() {
@@ -78,10 +78,6 @@ public class FragmentInstance implements IConsensusRequest {
this.regionReplicaSet = regionReplicaSet;
}
- public TEndPoint getHostEndpoint() {
- return hostEndpoint;
- }
-
public PlanFragment getFragment() {
return fragment;
}
@@ -119,7 +115,8 @@ public class FragmentInstance implements IConsensusRequest {
StringBuilder ret = new StringBuilder();
ret.append(String.format("FragmentInstance-%s:", getId()));
ret.append(
- String.format("Host: %s", getHostEndpoint() == null ? "Not set" : getHostEndpoint()));
+ String.format(
+ "Host: %s", getHostDataNode() == null ? "Not set" : getHostDataNode().dataNodeId));
ret.append(
String.format(
"Region: %s",
@@ -138,7 +135,7 @@ public class FragmentInstance implements IConsensusRequest {
FragmentInstance fragmentInstance =
new FragmentInstance(planFragment, id, timeFilter, queryType);
fragmentInstance.regionReplicaSet = ThriftCommonsSerDeUtils.readTRegionReplicaSet(buffer);
- fragmentInstance.hostEndpoint = ThriftCommonsSerDeUtils.readTEndPoint(buffer);
+ fragmentInstance.hostDataNode = ThriftCommonsSerDeUtils.readTDataNodeLocation(buffer);
return fragmentInstance;
}
@@ -153,7 +150,7 @@ public class FragmentInstance implements IConsensusRequest {
}
ReadWriteIOUtils.write(type.ordinal(), buffer);
ThriftCommonsSerDeUtils.writeTRegionReplicaSet(regionReplicaSet, buffer);
- ThriftCommonsSerDeUtils.writeTEndPoint(hostEndpoint, buffer);
+ ThriftCommonsSerDeUtils.writeTDataNodeLocation(hostDataNode, buffer);
}
@Override
@@ -165,12 +162,16 @@ public class FragmentInstance implements IConsensusRequest {
&& type == instance.type
&& Objects.equals(fragment, instance.fragment)
&& Objects.equals(regionReplicaSet, instance.regionReplicaSet)
- && Objects.equals(hostEndpoint, instance.hostEndpoint)
+ && Objects.equals(hostDataNode, instance.hostDataNode)
&& Objects.equals(timeFilter, instance.timeFilter);
}
@Override
public int hashCode() {
- return Objects.hash(id, type, fragment, regionReplicaSet, hostEndpoint, timeFilter);
+ return Objects.hash(id, type, fragment, regionReplicaSet, hostDataNode, timeFilter);
+ }
+
+ public TDataNodeLocation getHostDataNode() {
+ return hostDataNode;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 8aab414107..f8bfecbda2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -80,7 +80,8 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
DistributedQueryPlan plan = planner.planFragments();
@@ -100,7 +101,8 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -257,7 +259,8 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -294,7 +297,8 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
DistributedQueryPlan plan = planner.planFragments();
@@ -318,7 +322,8 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
context.setQueryType(QueryType.WRITE);
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
@@ -360,7 +365,8 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint());
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
context.setQueryType(QueryType.WRITE);
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index 7640a45224..1cfd02be15 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -51,7 +51,11 @@ public class QueryPlannerTest {
new QueryExecution(
stmt,
new MPPQueryContext(
- querySql, new QueryId("query1"), new SessionInfo(), new TEndPoint()),
+ querySql,
+ new QueryId("query1"),
+ new SessionInfo(),
+ new TEndPoint(),
+ new TEndPoint()),
IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
new FakePartitionFetcherImpl(),