You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/15 12:19:04 UTC

[iotdb] branch stable-pp created (now 3e056a6a89)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch stable-pp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 3e056a6a89 fix ci

This branch includes the following new commits:

     new 3e056a6a89 fix ci

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: fix ci

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-pp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3e056a6a89f1adf252c5d166f52eca5093d4e97a
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Apr 15 20:18:45 2022 +0800

    fix ci
---
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  4 +-
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |  2 -
 .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java |  6 ++-
 .../apache/iotdb/db/mpp/execution/Coordinator.java | 23 +++++++++--
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |  3 ++
 .../iotdb/db/mpp/execution/QueryExecution.java     | 22 ++++++++--
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  | 11 ++---
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |  3 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        | 48 ++++++++++++++++++----
 .../iotdb/db/mpp/sql/analyze/AnalyzerTest.java     |  6 ++-
 .../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java  |  5 ++-
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |  6 ++-
 12 files changed, 107 insertions(+), 32 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index 96802c3bd5..2934a6bf7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -108,8 +108,8 @@ public class SinkHandle implements ISinkHandle {
     // TODO: (xingtanzjr)
     // We temporarily make it sync instead of async to avoid EOS Event(SinkHandle close() method is
     // called) is sent before NewDataBlockEvent arrived
-    new SendNewDataBlockEventTask(startSequenceId, blockSizes).run();
-    //    executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
+    //    new SendNewDataBlockEventTask(startSequenceId, blockSizes).run();
+    executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index af6e715a16..b577174702 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -212,8 +212,6 @@ public class SourceHandle implements ISourceHandle {
   synchronized void setNoMoreTsBlocks(int lastSequenceId) {
     this.lastSequenceId = lastSequenceId;
     noMoreTsBlocks = true;
-    // someone may be waiting for this blocked, so here we need to notify it
-    blocked.set(null);
   }
 
   synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index dc09a1c038..7753bc3ff1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -34,6 +34,8 @@ public class StubSinkHandle implements ISinkHandle {
 
   private final List<TsBlock> tsBlocks = new ArrayList<>();
 
+  private boolean closed = false;
+
   @Override
   public long getBufferRetainedSizeInBytes() {
     return 0;
@@ -64,7 +66,7 @@ public class StubSinkHandle implements ISinkHandle {
 
   @Override
   public boolean isClosed() {
-    return false;
+    return closed;
   }
 
   @Override
@@ -74,7 +76,7 @@ public class StubSinkHandle implements ISinkHandle {
 
   @Override
   public void close() {
-    tsBlocks.clear();
+    closed = true;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 4bb6bfbf58..d92368cee4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
@@ -64,20 +66,33 @@ public class Coordinator {
     this.scheduledExecutor = getScheduledExecutor();
   }
 
-  private IQueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
+  private IQueryExecution createQueryExecution(
+      Statement statement,
+      MPPQueryContext queryContext,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
     if (statement instanceof SetStorageGroupStatement) {
       queryContext.setQueryType(QueryType.WRITE);
       return new ConfigExecution(queryContext, statement, executor);
     }
-    return new QueryExecution(statement, queryContext, executor, scheduledExecutor);
+    return new QueryExecution(
+        statement, queryContext, executor, scheduledExecutor, partitionFetcher, schemaFetcher);
   }
 
   public ExecutionResult execute(
-      Statement statement, QueryId queryId, SessionInfo session, String sql) {
+      Statement statement,
+      QueryId queryId,
+      SessionInfo session,
+      String sql,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
 
     IQueryExecution execution =
         createQueryExecution(
-            statement, new MPPQueryContext(sql, queryId, session, getHostEndpoint()));
+            statement,
+            new MPPQueryContext(sql, queryId, session, getHostEndpoint()),
+            partitionFetcher,
+            schemaFetcher);
     queryExecutionMap.put(queryId, execution);
 
     execution.start();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index ce6a513080..88c160dd28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -155,6 +155,9 @@ public class DataDriver implements Driver {
 
   @Override
   public void close() {
+    if (closed) {
+      return;
+    }
     closed = true;
     try {
       if (root != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 99241305d9..e7df772ad9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
@@ -70,6 +72,10 @@ public class QueryExecution implements IQueryExecution {
 
   private final ExecutorService executor;
   private final ScheduledExecutorService scheduledExecutor;
+  // TODO need to use factory to decide standalone or cluster
+  private final IPartitionFetcher partitionFetcher;
+  // TODO need to use factory to decide standalone or cluster
+  private final ISchemaFetcher schemaFetcher;
 
   // The result of QueryExecution will be written to the DataBlockManager in current Node.
   // We use this SourceHandle to fetch the TsBlock from it.
@@ -79,13 +85,17 @@ public class QueryExecution implements IQueryExecution {
       Statement statement,
       MPPQueryContext context,
       ExecutorService executor,
-      ScheduledExecutorService scheduledExecutor) {
+      ScheduledExecutorService scheduledExecutor,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
     this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
     this.context = context;
     this.planOptimizers = new ArrayList<>();
-    this.analysis = analyze(statement, context);
+    this.analysis = analyze(statement, context, partitionFetcher, schemaFetcher);
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
+    this.partitionFetcher = partitionFetcher;
+    this.schemaFetcher = schemaFetcher;
     // TODO: (xingtanzjr) Initialize the result handle after the DataBlockManager is merged.
     //    resultHandle = xxxx
 
@@ -110,9 +120,13 @@ public class QueryExecution implements IQueryExecution {
   }
 
   // Analyze the statement in QueryContext. Generate the analysis this query need
-  private static Analysis analyze(Statement statement, MPPQueryContext context) {
+  private static Analysis analyze(
+      Statement statement,
+      MPPQueryContext context,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
     // initialize the variable `analysis`
-    return new Analyzer(context).analyze(statement);
+    return new Analyzer(context, partitionFetcher, schemaFetcher).analyze(statement);
   }
 
   private void schedule() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index d8fcd29210..7c783694c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -70,13 +70,14 @@ public class Analyzer {
 
   private final MPPQueryContext context;
 
-  // TODO need to use factory to decide standalone or cluster
-  private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();
-  // TODO need to use factory to decide standalone or cluster
-  private final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
+  private final IPartitionFetcher partitionFetcher;
+  private final ISchemaFetcher schemaFetcher;
 
-  public Analyzer(MPPQueryContext context) {
+  public Analyzer(
+      MPPQueryContext context, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
     this.context = context;
+    this.partitionFetcher = partitionFetcher;
+    this.schemaFetcher = schemaFetcher;
   }
 
   public Analysis analyze(Statement statement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index ca7f1f06cb..783372eca6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -71,7 +71,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
     QueryId queryId =
         new QueryId(String.valueOf(SessionManager.getInstance().requestQueryId(false)));
-    ExecutionResult executionResult = coordinator.execute(schemaFetchStatement, queryId, null, "");
+    ExecutionResult executionResult =
+        coordinator.execute(schemaFetchStatement, queryId, null, "", partitionFetcher, this);
     // TODO: (xingtanzjr) throw exception
     if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new RuntimeException("cannot fetch schema");
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index f3cfdf5bf2..4b86aca612 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -27,6 +27,10 @@ import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.Coordinator;
 import org.apache.iotdb.db.mpp.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
@@ -70,6 +74,10 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
 
   private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
 
+  private static final IPartitionFetcher PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+
+  private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
     IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
@@ -216,7 +224,13 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
       QueryId id = new QueryId(String.valueOf(queryId));
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(s, id, SESSION_MANAGER.getSessionInfo(req.sessionId), statement);
+          COORDINATOR.execute(
+              s,
+              id,
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              statement,
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         throw new RuntimeException("");
@@ -316,7 +330,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -357,7 +373,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -398,7 +416,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -435,7 +455,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
@@ -467,7 +489,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -500,7 +524,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
@@ -538,7 +564,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -672,7 +700,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
index d8d1880232..4437a5551e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
@@ -48,7 +48,11 @@ public class AnalyzerTest {
 
   private void assertAnalyzeSemanticException(String sql, String message) {
     try {
-      Analyzer analyzer = new Analyzer(new MPPQueryContext(new QueryId("test_query")));
+      Analyzer analyzer =
+          new Analyzer(
+              new MPPQueryContext(new QueryId("test_query")),
+              new FakePartitionFetcherImpl(),
+              new FakeSchemaFetcherImpl());
       analyzer.analyze(StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()));
       fail();
     } catch (RuntimeException e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index 73001f41b3..94e5f23a38 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.FakePartitionFetcherImpl;
+import org.apache.iotdb.db.mpp.sql.analyze.FakeSchemaFetcherImpl;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -868,7 +870,8 @@ public class LogicalPlannerTest {
       Statement statement =
           StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
       MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
-      Analyzer analyzer = new Analyzer(context);
+      Analyzer analyzer =
+          new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl());
       Analysis analysis = analyzer.analyze(statement);
       LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
       planNode = planner.plan(analysis).getRootNode();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index ca4d50f542..a0b28041d4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.execution.QueryExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.FakePartitionFetcherImpl;
+import org.apache.iotdb.db.mpp.sql.analyze.FakeSchemaFetcherImpl;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
@@ -50,7 +52,9 @@ public class QueryPlannerTest {
             stmt,
             new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), new Endpoint()),
             IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
-            IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"));
+            IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
+            new FakePartitionFetcherImpl(),
+            new FakeSchemaFetcherImpl());
     queryExecution.doLogicalPlan();
     System.out.printf("SQL: %s%n%n", querySql);
     System.out.println("===== Step 1: Logical Plan =====");