You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/15 15:20:43 UTC
[iotdb] branch master updated: Fix mpp CI (#5562)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b2dfcf847b Fix mpp CI (#5562)
b2dfcf847b is described below
commit b2dfcf847be87958737e1fa257ac9f8a62c90daa
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri Apr 15 23:20:37 2022 +0800
Fix mpp CI (#5562)
---
.../iotdb/commons/consensus/DataRegionId.java | 4 ++
.../iotdb/commons/consensus/PartitionRegionId.java | 4 ++
.../iotdb/commons/consensus/SchemaRegionId.java | 4 ++
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 4 +-
.../apache/iotdb/db/mpp/buffer/SourceHandle.java | 2 -
.../apache/iotdb/db/mpp/buffer/StubSinkHandle.java | 6 ++-
.../apache/iotdb/db/mpp/execution/Coordinator.java | 23 +++++++++--
.../apache/iotdb/db/mpp/execution/DataDriver.java | 3 ++
.../iotdb/db/mpp/execution/QueryExecution.java | 22 ++++++++--
.../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 11 ++---
.../db/mpp/sql/analyze/ClusterSchemaFetcher.java | 3 +-
.../db/mpp/sql/planner/DistributionPlanner.java | 13 ++++--
.../thrift/impl/DataNodeTSIServiceImpl.java | 48 ++++++++++++++++++----
.../iotdb/db/mpp/sql/analyze/AnalyzerTest.java | 6 ++-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 10 ++++-
.../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java | 5 ++-
.../iotdb/db/mpp/sql/plan/QueryPlannerTest.java | 6 ++-
17 files changed, 137 insertions(+), 37 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
index 3085541766..b0c5d1ec46 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
@@ -75,4 +75,8 @@ public class DataRegionId implements ConsensusGroupId {
public int hashCode() {
return Objects.hash(id, GroupType.DataRegion);
}
+
+ public String toString() {
+ return String.format("%s[%d]", getType(), getId());
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
index 86f5fe963c..b9f64786c5 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
@@ -75,4 +75,8 @@ public class PartitionRegionId implements ConsensusGroupId {
public int hashCode() {
return Objects.hash(id, GroupType.PartitionRegion);
}
+
+ public String toString() {
+ return String.format("%s[%d]", getType(), getId());
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
index a42ddf3ad0..9649653c7f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
@@ -75,4 +75,8 @@ public class SchemaRegionId implements ConsensusGroupId {
public int hashCode() {
return Objects.hash(id, GroupType.SchemaRegion);
}
+
+ public String toString() {
+ return String.format("%s[%d]", getType(), getId());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index 96802c3bd5..2934a6bf7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -108,8 +108,8 @@ public class SinkHandle implements ISinkHandle {
// TODO: (xingtanzjr)
// We temporarily make it sync instead of async to avoid EOS Event(SinkHandle close() method is
// called) is sent before NewDataBlockEvent arrived
- new SendNewDataBlockEventTask(startSequenceId, blockSizes).run();
- // executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
+ // new SendNewDataBlockEventTask(startSequenceId, blockSizes).run();
+ executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index af6e715a16..b577174702 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -212,8 +212,6 @@ public class SourceHandle implements ISourceHandle {
synchronized void setNoMoreTsBlocks(int lastSequenceId) {
this.lastSequenceId = lastSequenceId;
noMoreTsBlocks = true;
- // someone may be waiting for this blocked, so here we need to notify it
- blocked.set(null);
}
synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index dc09a1c038..7753bc3ff1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -34,6 +34,8 @@ public class StubSinkHandle implements ISinkHandle {
private final List<TsBlock> tsBlocks = new ArrayList<>();
+ private boolean closed = false;
+
@Override
public long getBufferRetainedSizeInBytes() {
return 0;
@@ -64,7 +66,7 @@ public class StubSinkHandle implements ISinkHandle {
@Override
public boolean isClosed() {
- return false;
+ return closed;
}
@Override
@@ -74,7 +76,7 @@ public class StubSinkHandle implements ISinkHandle {
@Override
public void close() {
- tsBlocks.clear();
+ closed = true;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 4bb6bfbf58..d92368cee4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
@@ -64,20 +66,33 @@ public class Coordinator {
this.scheduledExecutor = getScheduledExecutor();
}
- private IQueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
+ private IQueryExecution createQueryExecution(
+ Statement statement,
+ MPPQueryContext queryContext,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
if (statement instanceof SetStorageGroupStatement) {
queryContext.setQueryType(QueryType.WRITE);
return new ConfigExecution(queryContext, statement, executor);
}
- return new QueryExecution(statement, queryContext, executor, scheduledExecutor);
+ return new QueryExecution(
+ statement, queryContext, executor, scheduledExecutor, partitionFetcher, schemaFetcher);
}
public ExecutionResult execute(
- Statement statement, QueryId queryId, SessionInfo session, String sql) {
+ Statement statement,
+ QueryId queryId,
+ SessionInfo session,
+ String sql,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
IQueryExecution execution =
createQueryExecution(
- statement, new MPPQueryContext(sql, queryId, session, getHostEndpoint()));
+ statement,
+ new MPPQueryContext(sql, queryId, session, getHostEndpoint()),
+ partitionFetcher,
+ schemaFetcher);
queryExecutionMap.put(queryId, execution);
execution.start();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index ce6a513080..88c160dd28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -155,6 +155,9 @@ public class DataDriver implements Driver {
@Override
public void close() {
+ if (closed) {
+ return;
+ }
closed = true;
try {
if (root != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 99241305d9..e7df772ad9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
@@ -70,6 +72,10 @@ public class QueryExecution implements IQueryExecution {
private final ExecutorService executor;
private final ScheduledExecutorService scheduledExecutor;
+ // TODO need to use factory to decide standalone or cluster
+ private final IPartitionFetcher partitionFetcher;
+ // TODO need to use factory to decide standalone or cluster
+ private final ISchemaFetcher schemaFetcher;
// The result of QueryExecution will be written to the DataBlockManager in current Node.
// We use this SourceHandle to fetch the TsBlock from it.
@@ -79,13 +85,17 @@ public class QueryExecution implements IQueryExecution {
Statement statement,
MPPQueryContext context,
ExecutorService executor,
- ScheduledExecutorService scheduledExecutor) {
+ ScheduledExecutorService scheduledExecutor,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.context = context;
this.planOptimizers = new ArrayList<>();
- this.analysis = analyze(statement, context);
+ this.analysis = analyze(statement, context, partitionFetcher, schemaFetcher);
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
+ this.partitionFetcher = partitionFetcher;
+ this.schemaFetcher = schemaFetcher;
// TODO: (xingtanzjr) Initialize the result handle after the DataBlockManager is merged.
// resultHandle = xxxx
@@ -110,9 +120,13 @@ public class QueryExecution implements IQueryExecution {
}
// Analyze the statement in QueryContext. Generate the analysis this query need
- private static Analysis analyze(Statement statement, MPPQueryContext context) {
+ private static Analysis analyze(
+ Statement statement,
+ MPPQueryContext context,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
// initialize the variable `analysis`
- return new Analyzer(context).analyze(statement);
+ return new Analyzer(context, partitionFetcher, schemaFetcher).analyze(statement);
}
private void schedule() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index d8fcd29210..7c783694c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -70,13 +70,14 @@ public class Analyzer {
private final MPPQueryContext context;
- // TODO need to use factory to decide standalone or cluster
- private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();
- // TODO need to use factory to decide standalone or cluster
- private final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
+ private final IPartitionFetcher partitionFetcher;
+ private final ISchemaFetcher schemaFetcher;
- public Analyzer(MPPQueryContext context) {
+ public Analyzer(
+ MPPQueryContext context, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
this.context = context;
+ this.partitionFetcher = partitionFetcher;
+ this.schemaFetcher = schemaFetcher;
}
public Analysis analyze(Statement statement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index ca7f1f06cb..783372eca6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -71,7 +71,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
QueryId queryId =
new QueryId(String.valueOf(SessionManager.getInstance().requestQueryId(false)));
- ExecutionResult executionResult = coordinator.execute(schemaFetchStatement, queryId, null, "");
+ ExecutionResult executionResult =
+ coordinator.execute(schemaFetchStatement, queryId, null, "", partitionFetcher, this);
// TODO: (xingtanzjr) throw exception
if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException("cannot fetch schema");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 909ab98080..ca42ec2336 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanN
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -376,9 +377,15 @@ public class DistributionPlanner {
private RegionReplicaSet calculateDataRegionByChildren(
List<PlanNode> children, NodeGroupContext context) {
- // We always make the dataRegion of TimeJoinNode to be the same as its first child.
- // TODO: (xingtanzjr) We need to implement more suitable policies here
- return context.getNodeDistribution(children.get(0).getPlanNodeId()).region;
+ // Step 1: calculate the count of children group by DataRegion.
+ Map<RegionReplicaSet, Long> groupByRegion =
+ children.stream()
+ .collect(
+ Collectors.groupingBy(
+ child -> context.getNodeDistribution(child.getPlanNodeId()).region,
+ Collectors.counting()));
+ // Step 2: return the RegionReplicaSet with max count
+ return Collections.max(groupByRegion.entrySet(), Map.Entry.comparingByValue()).getKey();
}
private RegionReplicaSet calculateSchemaRegionByChildren(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index f3cfdf5bf2..4b86aca612 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -27,6 +27,10 @@ import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.Coordinator;
import org.apache.iotdb.db.mpp.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
@@ -70,6 +74,10 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+ private static final IPartitionFetcher PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+
+ private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
@@ -216,7 +224,13 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
QueryId id = new QueryId(String.valueOf(queryId));
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(s, id, SESSION_MANAGER.getSessionInfo(req.sessionId), statement);
+ COORDINATOR.execute(
+ s,
+ id,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ statement,
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException("");
@@ -316,7 +330,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -357,7 +373,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -398,7 +416,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -435,7 +455,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
@@ -467,7 +489,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -500,7 +524,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
@@ -538,7 +564,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -672,7 +700,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
index d8d1880232..4437a5551e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
@@ -48,7 +48,11 @@ public class AnalyzerTest {
private void assertAnalyzeSemanticException(String sql, String message) {
try {
- Analyzer analyzer = new Analyzer(new MPPQueryContext(new QueryId("test_query")));
+ Analyzer analyzer =
+ new Analyzer(
+ new MPPQueryContext(new QueryId("test_query")),
+ new FakePartitionFetcherImpl(),
+ new FakeSchemaFetcherImpl());
analyzer.analyze(StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()));
fail();
} catch (RuntimeException e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index cbd56cc790..21ca0ed2d1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
@@ -99,7 +100,7 @@ public class DistributionPlannerTest {
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
PlanNode newRoot = planner.rewriteSource();
- assertEquals(newRoot.getChildren().get(0).getChildren().size(), 3);
+ assertEquals(4, newRoot.getChildren().get(0).getChildren().size());
}
@Test
@@ -181,7 +182,12 @@ public class DistributionPlannerTest {
new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
PlanNode rootAfterRewrite = planner.rewriteSource();
PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
- assertEquals(rootWithExchange.getChildren().get(0).getChildren().size(), 3);
+ assertEquals(4, rootWithExchange.getChildren().get(0).getChildren().size());
+ int exchangeNodeCount = 0;
+ for (PlanNode child : rootWithExchange.getChildren().get(0).getChildren()) {
+ exchangeNodeCount += child instanceof ExchangeNode ? 1 : 0;
+ }
+ assertEquals(2, exchangeNodeCount);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index 73001f41b3..94e5f23a38 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.FakePartitionFetcherImpl;
+import org.apache.iotdb.db.mpp.sql.analyze.FakeSchemaFetcherImpl;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -868,7 +870,8 @@ public class LogicalPlannerTest {
Statement statement =
StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
- Analyzer analyzer = new Analyzer(context);
+ Analyzer analyzer =
+ new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl());
Analysis analysis = analyzer.analyze(statement);
LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
planNode = planner.plan(analysis).getRootNode();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index ca4d50f542..a0b28041d4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.execution.QueryExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.FakePartitionFetcherImpl;
+import org.apache.iotdb.db.mpp.sql.analyze.FakeSchemaFetcherImpl;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
@@ -50,7 +52,9 @@ public class QueryPlannerTest {
stmt,
new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), new Endpoint()),
IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"));
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
+ new FakePartitionFetcherImpl(),
+ new FakeSchemaFetcherImpl());
queryExecution.doLogicalPlan();
System.out.printf("SQL: %s%n%n", querySql);
System.out.println("===== Step 1: Logical Plan =====");