You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/02/13 06:51:35 UTC
[iotdb] branch master updated: [IOTDB-5337] Parallelization of write operation in FragmentInstanceDispatcher (#8920)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a9168656d6 [IOTDB-5337] Parallelization of write operation in FragmentInstanceDispatcher (#8920)
a9168656d6 is described below
commit a9168656d680a4e2f37243adf9a226f92060a532
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Mon Feb 13 14:51:29 2023 +0800
[IOTDB-5337] Parallelization of write operation in FragmentInstanceDispatcher (#8920)
---
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 14 ++-
.../db/mpp/plan/execution/QueryExecution.java | 18 +++-
.../db/mpp/plan/scheduler/AsyncPlanNodeSender.java | 105 +++++++++++++++++++++
.../plan/scheduler/AsyncSendPlanNodeHandler.java | 61 ++++++++++++
.../db/mpp/plan/scheduler/ClusterScheduler.java | 12 ++-
.../scheduler/FragmentInstanceDispatcherImpl.java | 60 +++++++++++-
.../iotdb/db/mpp/plan/plan/QueryPlannerTest.java | 17 +++-
7 files changed, 266 insertions(+), 21 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 731b650dad..2a2ae6f0d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -69,11 +70,17 @@ public class Coordinator {
LoggerFactory.getLogger(IoTDBConstant.SLOW_SQL_LOGGER_NAME);
private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
- INTERNAL_SERVICE_CLIENT_MANAGER =
+ SYNC_INTERNAL_SERVICE_CLIENT_MANAGER =
new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
.createClientManager(
new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+ private static final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+ ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER =
+ new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
+
private final ExecutorService executor;
private final ExecutorService writeOperationExecutor;
private final ScheduledExecutorService scheduledExecutor;
@@ -112,7 +119,8 @@ public class Coordinator {
scheduledExecutor,
partitionFetcher,
schemaFetcher,
- INTERNAL_SERVICE_CLIENT_MANAGER);
+ SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
+ ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER);
}
public ExecutionResult execute(
@@ -222,7 +230,7 @@ public class Coordinator {
public IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
getInternalServiceClientManager() {
- return INTERNAL_SERVICE_CLIENT_MANAGER;
+ return SYNC_INTERNAL_SERVICE_CLIENT_MANAGER;
}
public static Coordinator getInstance() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index b0326403b0..284a634444 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.execution;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -121,7 +122,10 @@ public class QueryExecution implements IQueryExecution {
private ISourceHandle resultHandle;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
- internalServiceClientManager;
+ syncInternalServiceClientManager;
+
+ private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+ asyncInternalServiceClientManager;
private AtomicBoolean stopped;
@@ -137,7 +141,9 @@ public class QueryExecution implements IQueryExecution {
ScheduledExecutorService scheduledExecutor,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher,
- IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager,
+ IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+ asyncInternalServiceClientManager) {
this.rawStatement = statement;
this.executor = executor;
this.writeOperationExecutor = writeOperationExecutor;
@@ -148,7 +154,8 @@ public class QueryExecution implements IQueryExecution {
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
this.partitionFetcher = partitionFetcher;
this.schemaFetcher = schemaFetcher;
- this.internalServiceClientManager = internalServiceClientManager;
+ this.syncInternalServiceClientManager = syncInternalServiceClientManager;
+ this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
// We add the abort logic inside the QueryExecution.
// So that the other components can only focus on the state change.
@@ -267,7 +274,7 @@ public class QueryExecution implements IQueryExecution {
if (rawStatement instanceof LoadTsFileStatement) {
this.scheduler =
new LoadTsFileScheduler(
- distributedPlan, context, stateMachine, internalServiceClientManager);
+ distributedPlan, context, stateMachine, syncInternalServiceClientManager);
this.scheduler.start();
return;
}
@@ -282,7 +289,8 @@ public class QueryExecution implements IQueryExecution {
executor,
writeOperationExecutor,
scheduledExecutor,
- internalServiceClientManager);
+ syncInternalServiceClientManager,
+ asyncInternalServiceClientManager);
this.scheduler.start();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
new file mode 100644
index 0000000000..957f638ed1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.scheduler;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+
+public class AsyncPlanNodeSender {
+ private static final Logger logger = LoggerFactory.getLogger(AsyncPlanNodeSender.class);
+ private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+ asyncInternalServiceClientManager;
+ private final List<FragmentInstance> instances;
+ private final CountDownLatch countDownLatch;
+ private final Map<Integer, TSendPlanNodeResp> instanceId2RespMap;
+
+ public AsyncPlanNodeSender(
+ IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+ asyncInternalServiceClientManager,
+ List<FragmentInstance> instances) {
+ this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
+ this.instances = instances;
+ this.countDownLatch = new CountDownLatch(instances.size());
+ this.instanceId2RespMap = new ConcurrentHashMap<>();
+ }
+
+ public void sendAll() {
+ for (int i = 0; i < instances.size(); ++i) {
+ FragmentInstance instance = instances.get(i);
+ AsyncSendPlanNodeHandler handler =
+ new AsyncSendPlanNodeHandler(i, countDownLatch, instanceId2RespMap);
+ try {
+ TSendPlanNodeReq sendPlanNodeReq =
+ new TSendPlanNodeReq(
+ new TPlanNode(instance.getFragment().getPlanNodeTree().serializeToByteBuffer()),
+ instance.getRegionReplicaSet().getRegionId());
+ AsyncDataNodeInternalServiceClient client =
+ asyncInternalServiceClientManager.borrowClient(
+ instance.getHostDataNode().getInternalEndPoint());
+ client.sendPlanNode(sendPlanNodeReq, handler);
+ } catch (Exception e) {
+ handler.onError(e);
+ }
+ }
+ }
+
+ public void waitUntilCompleted() throws InterruptedException {
+ countDownLatch.await();
+ }
+
+ public Future<FragInstanceDispatchResult> getResult() {
+ for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
+ if (!entry.getValue().accepted) {
+ logger.warn(
+ "dispatch write failed. status: {}, code: {}, message: {}, node {}",
+ entry.getValue().status,
+ TSStatusCode.representOf(entry.getValue().status.code),
+ entry.getValue().message,
+ instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
+ if (entry.getValue().getStatus() == null) {
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage())));
+ } else {
+ return immediateFuture(new FragInstanceDispatchResult(entry.getValue().getStatus()));
+ }
+ }
+ }
+ return immediateFuture(new FragInstanceDispatchResult(true));
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
new file mode 100644
index 0000000000..9b8f88b9ae
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.scheduler;
+
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendPlanNodeResp> {
+ private final int instanceId;
+ private final CountDownLatch countDownLatch;
+ private final Map<Integer, TSendPlanNodeResp> instanceId2RespMap;
+
+ public AsyncSendPlanNodeHandler(
+ int instanceId,
+ CountDownLatch countDownLatch,
+ Map<Integer, TSendPlanNodeResp> instanceId2RespMap) {
+ this.instanceId = instanceId;
+ this.countDownLatch = countDownLatch;
+ this.instanceId2RespMap = instanceId2RespMap;
+ }
+
+ @Override
+ public void onComplete(TSendPlanNodeResp tSendPlanNodeResp) {
+ instanceId2RespMap.put(instanceId, tSendPlanNodeResp);
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onError(Exception e) {
+ TSendPlanNodeResp resp = new TSendPlanNodeResp();
+ String errorMsg = String.format("Fail to send plan node, exception message: %s", e);
+ resp.setAccepted(false);
+ resp.setMessage(errorMsg);
+ resp.setStatus(
+ RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg));
+ instanceId2RespMap.put(instanceId, resp);
+ countDownLatch.countDown();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 773f4b3165..d704726266 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -74,7 +75,9 @@ public class ClusterScheduler implements IScheduler {
ExecutorService executor,
ExecutorService writeOperationExecutor,
ScheduledExecutorService scheduledExecutor,
- IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager,
+ IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+ asyncInternalServiceClientManager) {
this.stateMachine = stateMachine;
this.instances = instances;
this.queryType = queryType;
@@ -84,17 +87,18 @@ public class ClusterScheduler implements IScheduler {
queryContext,
executor,
writeOperationExecutor,
- internalServiceClientManager);
+ syncInternalServiceClientManager,
+ asyncInternalServiceClientManager);
if (queryType == QueryType.READ) {
this.stateTracker =
new FixedRateFragInsStateTracker(
- stateMachine, scheduledExecutor, instances, internalServiceClientManager);
+ stateMachine, scheduledExecutor, instances, syncInternalServiceClientManager);
this.queryTerminator =
new SimpleQueryTerminator(
scheduledExecutor,
queryContext,
instances,
- internalServiceClientManager,
+ syncInternalServiceClientManager,
stateTracker);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 0007d17f18..0425b4fc72 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -68,7 +69,9 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
private final String localhostIpAddr;
private final int localhostInternalPort;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
- internalServiceClientManager;
+ syncInternalServiceClientManager;
+ private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+ asyncInternalServiceClientManager;
private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
@@ -77,12 +80,15 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
MPPQueryContext queryContext,
ExecutorService executor,
ExecutorService writeOperationExecutor,
- IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager,
+ IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+ asyncInternalServiceClientManager) {
this.type = type;
this.queryContext = queryContext;
this.executor = executor;
this.writeOperationExecutor = writeOperationExecutor;
- this.internalServiceClientManager = internalServiceClientManager;
+ this.syncInternalServiceClientManager = syncInternalServiceClientManager;
+ this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
this.localhostInternalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort();
}
@@ -92,7 +98,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
if (type == QueryType.READ) {
return dispatchRead(instances);
} else {
- return dispatchWriteSync(instances);
+ return dispatchWriteAsync(instances);
}
}
@@ -154,6 +160,50 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
}
}
+ private Future<FragInstanceDispatchResult> dispatchWriteAsync(List<FragmentInstance> instances) {
+ // split local and remote instances
+ List<FragmentInstance> localInstances = new ArrayList<>();
+ List<FragmentInstance> remoteInstances = new ArrayList<>();
+ for (FragmentInstance instance : instances) {
+ TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
+ if (isDispatchedToLocal(endPoint)) {
+ localInstances.add(instance);
+ } else {
+ remoteInstances.add(instance);
+ }
+ }
+ // async dispatch to remote
+ AsyncPlanNodeSender asyncPlanNodeSender =
+ new AsyncPlanNodeSender(asyncInternalServiceClientManager, remoteInstances);
+ asyncPlanNodeSender.sendAll();
+ // sync dispatch to local
+ for (FragmentInstance localInstance : localInstances) {
+ try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) {
+ dispatchOneInstance(localInstance);
+ } catch (FragmentInstanceDispatchException e) {
+ return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
+ } catch (Throwable t) {
+ logger.warn("[DispatchFailed]", t);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
+ }
+ }
+ // wait until remote dispatch done
+ try {
+ asyncPlanNodeSender.waitUntilCompleted();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Interrupted when dispatching write async", e);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " + e.getMessage())));
+ }
+ return asyncPlanNodeSender.getResult();
+ }
+
private void dispatchOneInstance(FragmentInstance instance)
throws FragmentInstanceDispatchException {
TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
@@ -171,7 +221,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
throws FragmentInstanceDispatchException {
try (SyncDataNodeInternalServiceClient client =
- internalServiceClientManager.borrowClient(endPoint)) {
+ syncInternalServiceClientManager.borrowClient(endPoint)) {
switch (instance.getType()) {
case READ:
TSendFragmentInstanceReq sendFragmentInstanceReq =
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
index 5d0c47d011..20881e3a3a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.plan;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -45,19 +46,26 @@ import java.time.ZoneId;
public class QueryPlannerTest {
private static IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
- internalServiceClientManager;
+ syncInternalServiceClientManager;
+
+ private static IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+ asyncInternalServiceClientManager;
@BeforeClass
public static void setUp() {
- internalServiceClientManager =
+ syncInternalServiceClientManager =
new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
.createClientManager(
new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+ asyncInternalServiceClientManager =
+ new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
}
@AfterClass
public static void destroy() {
- internalServiceClientManager.close();
+ syncInternalServiceClientManager.close();
}
@Ignore
@@ -82,7 +90,8 @@ public class QueryPlannerTest {
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
new FakePartitionFetcherImpl(),
new FakeSchemaFetcherImpl(),
- internalServiceClientManager);
+ syncInternalServiceClientManager,
+ asyncInternalServiceClientManager);
queryExecution.doLogicalPlan();
System.out.printf("SQL: %s%n%n", querySql);
System.out.println("===== Step 1: Logical Plan =====");