You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/15 12:19:05 UTC

[iotdb] 01/01: fix ci

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-pp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3e056a6a89f1adf252c5d166f52eca5093d4e97a
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Apr 15 20:18:45 2022 +0800

    fix ci
---
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  4 +-
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |  2 -
 .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java |  6 ++-
 .../apache/iotdb/db/mpp/execution/Coordinator.java | 23 +++++++++--
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |  3 ++
 .../iotdb/db/mpp/execution/QueryExecution.java     | 22 ++++++++--
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  | 11 ++---
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |  3 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        | 48 ++++++++++++++++++----
 .../iotdb/db/mpp/sql/analyze/AnalyzerTest.java     |  6 ++-
 .../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java  |  5 ++-
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |  6 ++-
 12 files changed, 107 insertions(+), 32 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index 96802c3bd5..2934a6bf7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -108,8 +108,8 @@ public class SinkHandle implements ISinkHandle {
     // TODO: (xingtanzjr)
     // We temporarily make it sync instead of async to avoid EOS Event(SinkHandle close() method is
     // called) is sent before NewDataBlockEvent arrived
-    new SendNewDataBlockEventTask(startSequenceId, blockSizes).run();
-    //    executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
+    //    new SendNewDataBlockEventTask(startSequenceId, blockSizes).run();
+    executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index af6e715a16..b577174702 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -212,8 +212,6 @@ public class SourceHandle implements ISourceHandle {
   synchronized void setNoMoreTsBlocks(int lastSequenceId) {
     this.lastSequenceId = lastSequenceId;
     noMoreTsBlocks = true;
-    // someone may be waiting for this blocked, so here we need to notify it
-    blocked.set(null);
   }
 
   synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index dc09a1c038..7753bc3ff1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -34,6 +34,8 @@ public class StubSinkHandle implements ISinkHandle {
 
   private final List<TsBlock> tsBlocks = new ArrayList<>();
 
+  private boolean closed = false;
+
   @Override
   public long getBufferRetainedSizeInBytes() {
     return 0;
@@ -64,7 +66,7 @@ public class StubSinkHandle implements ISinkHandle {
 
   @Override
   public boolean isClosed() {
-    return false;
+    return closed;
   }
 
   @Override
@@ -74,7 +76,7 @@ public class StubSinkHandle implements ISinkHandle {
 
   @Override
   public void close() {
-    tsBlocks.clear();
+    closed = true;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 4bb6bfbf58..d92368cee4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
@@ -64,20 +66,33 @@ public class Coordinator {
     this.scheduledExecutor = getScheduledExecutor();
   }
 
-  private IQueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
+  private IQueryExecution createQueryExecution(
+      Statement statement,
+      MPPQueryContext queryContext,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
     if (statement instanceof SetStorageGroupStatement) {
       queryContext.setQueryType(QueryType.WRITE);
       return new ConfigExecution(queryContext, statement, executor);
     }
-    return new QueryExecution(statement, queryContext, executor, scheduledExecutor);
+    return new QueryExecution(
+        statement, queryContext, executor, scheduledExecutor, partitionFetcher, schemaFetcher);
   }
 
   public ExecutionResult execute(
-      Statement statement, QueryId queryId, SessionInfo session, String sql) {
+      Statement statement,
+      QueryId queryId,
+      SessionInfo session,
+      String sql,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
 
     IQueryExecution execution =
         createQueryExecution(
-            statement, new MPPQueryContext(sql, queryId, session, getHostEndpoint()));
+            statement,
+            new MPPQueryContext(sql, queryId, session, getHostEndpoint()),
+            partitionFetcher,
+            schemaFetcher);
     queryExecutionMap.put(queryId, execution);
 
     execution.start();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index ce6a513080..88c160dd28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -155,6 +155,9 @@ public class DataDriver implements Driver {
 
   @Override
   public void close() {
+    if (closed) {
+      return;
+    }
     closed = true;
     try {
       if (root != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 99241305d9..e7df772ad9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
@@ -70,6 +72,10 @@ public class QueryExecution implements IQueryExecution {
 
   private final ExecutorService executor;
   private final ScheduledExecutorService scheduledExecutor;
+  // TODO need to use factory to decide standalone or cluster
+  private final IPartitionFetcher partitionFetcher;
+  // TODO need to use factory to decide standalone or cluster
+  private final ISchemaFetcher schemaFetcher;
 
   // The result of QueryExecution will be written to the DataBlockManager in current Node.
   // We use this SourceHandle to fetch the TsBlock from it.
@@ -79,13 +85,17 @@ public class QueryExecution implements IQueryExecution {
       Statement statement,
       MPPQueryContext context,
       ExecutorService executor,
-      ScheduledExecutorService scheduledExecutor) {
+      ScheduledExecutorService scheduledExecutor,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
     this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
     this.context = context;
     this.planOptimizers = new ArrayList<>();
-    this.analysis = analyze(statement, context);
+    this.analysis = analyze(statement, context, partitionFetcher, schemaFetcher);
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
+    this.partitionFetcher = partitionFetcher;
+    this.schemaFetcher = schemaFetcher;
     // TODO: (xingtanzjr) Initialize the result handle after the DataBlockManager is merged.
     //    resultHandle = xxxx
 
@@ -110,9 +120,13 @@ public class QueryExecution implements IQueryExecution {
   }
 
   // Analyze the statement in QueryContext. Generate the analysis this query need
-  private static Analysis analyze(Statement statement, MPPQueryContext context) {
+  private static Analysis analyze(
+      Statement statement,
+      MPPQueryContext context,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
     // initialize the variable `analysis`
-    return new Analyzer(context).analyze(statement);
+    return new Analyzer(context, partitionFetcher, schemaFetcher).analyze(statement);
   }
 
   private void schedule() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index d8fcd29210..7c783694c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -70,13 +70,14 @@ public class Analyzer {
 
   private final MPPQueryContext context;
 
-  // TODO need to use factory to decide standalone or cluster
-  private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();
-  // TODO need to use factory to decide standalone or cluster
-  private final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
+  private final IPartitionFetcher partitionFetcher;
+  private final ISchemaFetcher schemaFetcher;
 
-  public Analyzer(MPPQueryContext context) {
+  public Analyzer(
+      MPPQueryContext context, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
     this.context = context;
+    this.partitionFetcher = partitionFetcher;
+    this.schemaFetcher = schemaFetcher;
   }
 
   public Analysis analyze(Statement statement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index ca7f1f06cb..783372eca6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -71,7 +71,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
     QueryId queryId =
         new QueryId(String.valueOf(SessionManager.getInstance().requestQueryId(false)));
-    ExecutionResult executionResult = coordinator.execute(schemaFetchStatement, queryId, null, "");
+    ExecutionResult executionResult =
+        coordinator.execute(schemaFetchStatement, queryId, null, "", partitionFetcher, this);
     // TODO: (xingtanzjr) throw exception
     if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new RuntimeException("cannot fetch schema");
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index f3cfdf5bf2..4b86aca612 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -27,6 +27,10 @@ import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.Coordinator;
 import org.apache.iotdb.db.mpp.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
@@ -70,6 +74,10 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
 
   private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
 
+  private static final IPartitionFetcher PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+
+  private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
     IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
@@ -216,7 +224,13 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
       QueryId id = new QueryId(String.valueOf(queryId));
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(s, id, SESSION_MANAGER.getSessionInfo(req.sessionId), statement);
+          COORDINATOR.execute(
+              s,
+              id,
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              statement,
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         throw new RuntimeException("");
@@ -316,7 +330,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -357,7 +373,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -398,7 +416,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -435,7 +455,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
@@ -467,7 +489,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -500,7 +524,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
@@ -538,7 +564,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -672,7 +700,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               statement,
               new QueryId(String.valueOf(queryId)),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
-              "");
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
 
       // TODO(INSERT) do this check in analyze
       //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
index d8d1880232..4437a5551e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
@@ -48,7 +48,11 @@ public class AnalyzerTest {
 
   private void assertAnalyzeSemanticException(String sql, String message) {
     try {
-      Analyzer analyzer = new Analyzer(new MPPQueryContext(new QueryId("test_query")));
+      Analyzer analyzer =
+          new Analyzer(
+              new MPPQueryContext(new QueryId("test_query")),
+              new FakePartitionFetcherImpl(),
+              new FakeSchemaFetcherImpl());
       analyzer.analyze(StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()));
       fail();
     } catch (RuntimeException e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index 73001f41b3..94e5f23a38 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.FakePartitionFetcherImpl;
+import org.apache.iotdb.db.mpp.sql.analyze.FakeSchemaFetcherImpl;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -868,7 +870,8 @@ public class LogicalPlannerTest {
       Statement statement =
           StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
       MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
-      Analyzer analyzer = new Analyzer(context);
+      Analyzer analyzer =
+          new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl());
       Analysis analysis = analyzer.analyze(statement);
       LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
       planNode = planner.plan(analysis).getRootNode();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index ca4d50f542..a0b28041d4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.execution.QueryExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.FakePartitionFetcherImpl;
+import org.apache.iotdb.db.mpp.sql.analyze.FakeSchemaFetcherImpl;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
@@ -50,7 +52,9 @@ public class QueryPlannerTest {
             stmt,
             new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), new Endpoint()),
             IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
-            IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"));
+            IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
+            new FakePartitionFetcherImpl(),
+            new FakeSchemaFetcherImpl());
     queryExecution.doLogicalPlan();
     System.out.printf("SQL: %s%n%n", querySql);
     System.out.println("===== Step 1: Logical Plan =====");