You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/26 08:33:35 UTC
[iotdb] 01/01: [IOTDB-3013] Using Client Pool to replace previous DataBlockServiceClientFactory and InternalServiceClientFactory
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 115c8323758a14ec054c1e60252f3977f25402c4
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Apr 26 16:31:58 2022 +0800
[IOTDB-3013] Using Client Pool to replace previous DataBlockServiceClientFactory and InternalServiceClientFactory
---
.../iotdb/db/mpp/buffer/DataBlockManager.java | 17 +--
.../iotdb/db/mpp/buffer/DataBlockService.java | 8 +-
.../mpp/buffer/DataBlockServiceClientFactory.java | 117 ---------------------
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 31 ++++--
.../apache/iotdb/db/mpp/buffer/SourceHandle.java | 30 ++++--
.../apache/iotdb/db/mpp/execution/Coordinator.java | 19 +++-
.../iotdb/db/mpp/execution/QueryExecution.java | 15 ++-
.../scheduler/AbstractFragInsStateTracker.java | 39 ++++---
.../mpp/execution/scheduler/ClusterScheduler.java | 14 ++-
.../scheduler/FixedRateFragInsStateTracker.java | 11 +-
.../scheduler/InternalServiceClientFactory.java | 112 --------------------
.../scheduler/SimpleFragInstanceDispatcher.java | 50 +++++----
.../execution/scheduler/SimpleQueryTerminator.java | 31 ++++--
.../execution/scheduler/StandaloneScheduler.java | 12 ++-
.../apache/iotdb/db/mpp/buffer/SinkHandleTest.java | 51 ++++++---
.../iotdb/db/mpp/buffer/SourceHandleTest.java | 65 ++++++++----
.../iotdb/db/mpp/sql/plan/QueryPlannerTest.java | 24 ++++-
17 files changed, 300 insertions(+), 346 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index d55de6f780..e4d38df2f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.mpp.buffer;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
@@ -259,7 +261,8 @@ public class DataBlockManager implements IDataBlockManager {
private final LocalMemoryManager localMemoryManager;
private final Supplier<TsBlockSerde> tsBlockSerdeFactory;
private final ExecutorService executorService;
- private final DataBlockServiceClientFactory clientFactory;
+ private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
+ dataBlockServiceClientManager;
private final Map<TFragmentInstanceId, Map<String, SourceHandle>> sourceHandles;
private final Map<TFragmentInstanceId, SinkHandle> sinkHandles;
@@ -269,11 +272,11 @@ public class DataBlockManager implements IDataBlockManager {
LocalMemoryManager localMemoryManager,
Supplier<TsBlockSerde> tsBlockSerdeFactory,
ExecutorService executorService,
- DataBlockServiceClientFactory clientFactory) {
+ IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> dataBlockServiceClientManager) {
this.localMemoryManager = Validate.notNull(localMemoryManager);
this.tsBlockSerdeFactory = Validate.notNull(tsBlockSerdeFactory);
this.executorService = Validate.notNull(executorService);
- this.clientFactory = Validate.notNull(clientFactory);
+ this.dataBlockServiceClientManager = Validate.notNull(dataBlockServiceClientManager);
sourceHandles = new ConcurrentHashMap<>();
sinkHandles = new ConcurrentHashMap<>();
}
@@ -311,9 +314,9 @@ public class DataBlockManager implements IDataBlockManager {
localFragmentInstanceId,
localMemoryManager,
executorService,
- clientFactory.getDataBlockServiceClient(remoteEndpoint),
tsBlockSerdeFactory.get(),
- new SinkHandleListenerImpl(instanceContext, instanceContext::failed));
+ new SinkHandleListenerImpl(instanceContext, instanceContext::failed),
+ dataBlockServiceClientManager);
sinkHandles.put(localFragmentInstanceId, sinkHandle);
return sinkHandle;
}
@@ -349,9 +352,9 @@ public class DataBlockManager implements IDataBlockManager {
localPlanNodeId,
localMemoryManager,
executorService,
- clientFactory.getDataBlockServiceClient(remoteEndpoint),
tsBlockSerdeFactory.get(),
- new SourceHandleListenerImpl(onFailureCallback));
+ new SourceHandleListenerImpl(onFailureCallback),
+ dataBlockServiceClientManager);
sourceHandles
.computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>())
.put(localPlanNodeId, sourceHandle);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
index cabb725b95..7f8c6a6f5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.mpp.buffer;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -26,6 +29,7 @@ import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.service.ThriftService;
import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
@@ -67,7 +71,9 @@ public class DataBlockService extends ThriftService implements DataBlockServiceM
new LocalMemoryManager(),
new TsBlockSerdeFactory(),
executorService,
- new DataBlockServiceClientFactory());
+ new IClientManager.Factory<TEndPoint, SyncDataNodeDataBlockServiceClient>()
+ .createClientManager(
+ new DataNodeClientPoolFactory.SyncDataNodeDataBlockServiceClientPoolFactory()));
processor = new Processor<>(dataBlockManager.getOrCreateDataBlockServiceImpl());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
deleted file mode 100644
index a363fed2bb..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.buffer;
-
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class DataBlockServiceClientFactory {
-
- private static final int TIMEOUT_MS = 30_000;
-
- private static final Logger logger = LoggerFactory.getLogger(DataBlockServiceClientFactory.class);
-
- // TODO need to be replaced by mature client pool in the future
- private static final Map<TEndPoint, DataBlockService.Iface> dataBlockServiceClientMap =
- new ConcurrentHashMap<>();
-
- public DataBlockService.Iface getDataBlockServiceClient(TEndPoint endpoint) {
-
- return dataBlockServiceClientMap.computeIfAbsent(
- endpoint,
- address -> {
- TTransport transport;
- try {
- transport =
- RpcTransportFactory.INSTANCE.getTransport(
- // as there is a try-catch already, we do not need to use TSocket.wrap
- address.getIp(), address.getPort(), TIMEOUT_MS);
- transport.open();
- TProtocol protocol =
- IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()
- ? new TCompactProtocol(transport)
- : new TBinaryProtocol(transport);
- return newSynchronizedClient(new DataBlockService.Client(protocol));
- } catch (TTransportException e) {
- logger.error(
- "error happened while creating mpp service client for {}:{}",
- endpoint.getIp(),
- endpoint.getPort(),
- e);
- throw new RuntimeException(e);
- }
- });
- }
-
- public static DataBlockService.Iface newSynchronizedClient(DataBlockService.Iface client) {
- return (DataBlockService.Iface)
- Proxy.newProxyInstance(
- DataBlockServiceClientFactory.class.getClassLoader(),
- new Class[] {DataBlockService.Iface.class},
- new SynchronizedHandler(client));
- }
-
- private static class SynchronizedHandler implements InvocationHandler {
-
- private final DataBlockService.Iface client;
-
- public SynchronizedHandler(DataBlockService.Iface client) {
- this.client = client;
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- try {
- synchronized (client) {
- return method.invoke(client, args);
- }
- } catch (InvocationTargetException e) {
- // all IFace APIs throw TException
- if (e.getTargetException() instanceof TException) {
- throw e.getTargetException();
- } else {
- // should not happen
- throw new TException(
- "Error in calling method " + method.getName(), e.getTargetException());
- }
- } catch (Exception e) {
- throw new TException("Error in calling method " + method.getName(), e);
- }
- }
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index 5f734747b5..18736727ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -20,9 +20,10 @@
package org.apache.iotdb.db.mpp.buffer;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
@@ -60,7 +61,6 @@ public class SinkHandle implements ISinkHandle {
private final TFragmentInstanceId localFragmentInstanceId;
private final LocalMemoryManager localMemoryManager;
private final ExecutorService executorService;
- private final DataBlockService.Iface client;
private final TsBlockSerde serde;
private final SinkHandleListener sinkHandleListener;
@@ -69,6 +69,9 @@ public class SinkHandle implements ISinkHandle {
// 2. Fast lookup.
private final LinkedHashMap<Integer, TsBlock> sequenceIdToTsBlock = new LinkedHashMap<>();
+ private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
+ dataBlockServiceClientManager;
+
private volatile ListenableFuture<Void> blocked = immediateFuture(null);
private int nextSequenceId = 0;
private long bufferRetainedSizeInBytes = 0;
@@ -82,18 +85,18 @@ public class SinkHandle implements ISinkHandle {
TFragmentInstanceId localFragmentInstanceId,
LocalMemoryManager localMemoryManager,
ExecutorService executorService,
- DataBlockService.Iface client,
TsBlockSerde serde,
- SinkHandleListener sinkHandleListener) {
+ SinkHandleListener sinkHandleListener,
+ IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> dataBlockServiceClientManager) {
this.remoteEndpoint = Validate.notNull(remoteEndpoint);
this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
this.localMemoryManager = Validate.notNull(localMemoryManager);
this.executorService = Validate.notNull(executorService);
- this.client = Validate.notNull(client);
this.serde = Validate.notNull(serde);
this.sinkHandleListener = Validate.notNull(sinkHandleListener);
+ this.dataBlockServiceClientManager = dataBlockServiceClientManager;
}
@Override
@@ -152,7 +155,7 @@ public class SinkHandle implements ISinkHandle {
throw new UnsupportedOperationException();
}
- private void sendEndOfDataBlockEvent() throws TException {
+ private void sendEndOfDataBlockEvent() throws Exception {
logger.debug(
"Send end of data block event to plan node {} of {}. {}",
remotePlanNodeId,
@@ -167,10 +170,12 @@ public class SinkHandle implements ISinkHandle {
nextSequenceId - 1);
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
+ SyncDataNodeDataBlockServiceClient client = null;
try {
+ client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
client.onEndOfDataBlockEvent(endOfDataBlockEvent);
break;
- } catch (TException e) {
+ } catch (TException | IOException e) {
logger.error(
"Failed to send end of data block event to plan node {} of {} due to {}, attempt times: {}",
remotePlanNodeId,
@@ -181,6 +186,10 @@ public class SinkHandle implements ISinkHandle {
if (attempt == MAX_ATTEMPT_TIMES) {
throw e;
}
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
}
}
@@ -193,7 +202,7 @@ public class SinkHandle implements ISinkHandle {
}
try {
sendEndOfDataBlockEvent();
- } catch (TException e) {
+ } catch (Exception e) {
throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
}
synchronized (this) {
@@ -356,7 +365,9 @@ public class SinkHandle implements ISinkHandle {
blockSizes);
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
+ SyncDataNodeDataBlockServiceClient client = null;
try {
+ client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
client.onNewDataBlockEvent(newDataBlockEvent);
break;
} catch (Throwable e) {
@@ -370,6 +381,10 @@ public class SinkHandle implements ISinkHandle {
if (attempt == MAX_ATTEMPT_TIMES) {
sinkHandleListener.onFailure(SinkHandle.this, e);
}
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index cbe5e9d540..10aaf4ee33 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -20,9 +20,10 @@
package org.apache.iotdb.db.mpp.buffer;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
@@ -33,7 +34,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.lang3.Validate;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,13 +59,15 @@ public class SourceHandle implements ISourceHandle {
private final String localPlanNodeId;
private final LocalMemoryManager localMemoryManager;
private final ExecutorService executorService;
- private final DataBlockService.Iface client;
private final TsBlockSerde serde;
private final SourceHandleListener sourceHandleListener;
private final Map<Integer, TsBlock> sequenceIdToTsBlock = new HashMap<>();
private final Map<Integer, Long> sequenceIdToDataBlockSize = new HashMap<>();
+ private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
+ dataBlockServiceClientManager;
+
private volatile SettableFuture<Void> blocked = SettableFuture.create();
private long bufferRetainedSizeInBytes = 0L;
private int currSequenceId = 0;
@@ -80,19 +82,19 @@ public class SourceHandle implements ISourceHandle {
String localPlanNodeId,
LocalMemoryManager localMemoryManager,
ExecutorService executorService,
- DataBlockService.Iface client,
TsBlockSerde serde,
- SourceHandleListener sourceHandleListener) {
+ SourceHandleListener sourceHandleListener,
+ IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> dataBlockServiceClientManager) {
this.remoteEndpoint = Validate.notNull(remoteEndpoint);
this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
this.localPlanNodeId = Validate.notNull(localPlanNodeId);
this.localMemoryManager = Validate.notNull(localMemoryManager);
this.executorService = Validate.notNull(executorService);
- this.client = Validate.notNull(client);
this.serde = Validate.notNull(serde);
this.sourceHandleListener = Validate.notNull(sourceHandleListener);
bufferRetainedSizeInBytes = 0L;
+ this.dataBlockServiceClientManager = dataBlockServiceClientManager;
}
@Override
@@ -315,7 +317,9 @@ public class SourceHandle implements ISourceHandle {
int attempt = 0;
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
+ SyncDataNodeDataBlockServiceClient client = null;
try {
+ client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
TGetDataBlockResponse resp = client.getDataBlock(req);
List<TsBlock> tsBlocks = new ArrayList<>(resp.getTsBlocks().size());
for (ByteBuffer byteBuffer : resp.getTsBlocks()) {
@@ -336,7 +340,7 @@ public class SourceHandle implements ISourceHandle {
executorService.submit(
new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
break;
- } catch (TException e) {
+ } catch (Throwable e) {
logger.error(
"Failed to get data block from {} due to {}, attempt times: {}",
remoteFragmentInstanceId,
@@ -351,6 +355,10 @@ public class SourceHandle implements ISourceHandle {
sourceHandleListener.onFailure(SourceHandle.this, e);
}
}
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
}
// TODO: try to issue another GetDataBlocksTask to make the query run faster.
@@ -379,10 +387,12 @@ public class SourceHandle implements ISourceHandle {
new TAcknowledgeDataBlockEvent(remoteFragmentInstanceId, startSequenceId, endSequenceId);
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
+ SyncDataNodeDataBlockServiceClient client = null;
try {
+ client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
client.onAcknowledgeDataBlockEvent(acknowledgeDataBlockEvent);
break;
- } catch (TException e) {
+ } catch (Throwable e) {
logger.error(
"Failed to send ack data block event [{}, {}) to {} due to {}, attempt times: {}",
startSequenceId,
@@ -395,6 +405,10 @@ public class SourceHandle implements ISourceHandle {
sourceHandleListener.onFailure(SourceHandle.this, e);
}
}
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index a11103df49..3a7c367317 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -19,7 +19,10 @@
package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
@@ -43,7 +46,7 @@ import java.util.concurrent.ScheduledExecutorService;
* QueryExecution.
*/
public class Coordinator {
- private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(Coordinator.class);
private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
private static final int COORDINATOR_EXECUTOR_SIZE = 1;
@@ -55,6 +58,12 @@ public class Coordinator {
IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
IoTDBDescriptor.getInstance().getConfig().getInternalPort());
+ private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ INTERNAL_SERVICE_CLIENT_MANAGER =
+ new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+
private final ExecutorService executor;
private final ScheduledExecutorService scheduledExecutor;
@@ -78,7 +87,13 @@ public class Coordinator {
return new ConfigExecution(queryContext, statement, executor);
}
return new QueryExecution(
- statement, queryContext, executor, scheduledExecutor, partitionFetcher, schemaFetcher);
+ statement,
+ queryContext,
+ executor,
+ scheduledExecutor,
+ partitionFetcher,
+ schemaFetcher,
+ INTERNAL_SERVICE_CLIENT_MANAGER);
}
public ExecutionResult execute(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 67b4be045c..f0960163dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.buffer.DataBlockService;
@@ -90,13 +92,17 @@ public class QueryExecution implements IQueryExecution {
// We use this SourceHandle to fetch the TsBlock from it.
private ISourceHandle resultHandle;
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ internalServiceClientManager;
+
public QueryExecution(
Statement statement,
MPPQueryContext context,
ExecutorService executor,
ScheduledExecutorService scheduledExecutor,
IPartitionFetcher partitionFetcher,
- ISchemaFetcher schemaFetcher) {
+ ISchemaFetcher schemaFetcher,
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.context = context;
@@ -105,6 +111,7 @@ public class QueryExecution implements IQueryExecution {
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
this.partitionFetcher = partitionFetcher;
this.schemaFetcher = schemaFetcher;
+ this.internalServiceClientManager = internalServiceClientManager;
// We add the abort logic inside the QueryExecution.
// So that the other components can only focus on the state change.
@@ -153,14 +160,16 @@ public class QueryExecution implements IQueryExecution {
distributedPlan.getInstances(),
context.getQueryType(),
executor,
- scheduledExecutor)
+ scheduledExecutor,
+ internalServiceClientManager)
: new StandaloneScheduler(
context,
stateMachine,
distributedPlan.getInstances(),
context.getQueryType(),
executor,
- scheduledExecutor);
+ scheduledExecutor,
+ internalServiceClientManager);
this.scheduler.start();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index 71706b4857..89e48860fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -20,17 +20,19 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
import org.apache.thrift.TException;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -42,31 +44,44 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
protected ScheduledExecutorService scheduledExecutor;
protected List<FragmentInstance> instances;
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ internalServiceClientManager;
+
public AbstractFragInsStateTracker(
QueryStateMachine stateMachine,
ExecutorService executor,
ScheduledExecutorService scheduledExecutor,
- List<FragmentInstance> instances) {
+ List<FragmentInstance> instances,
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
this.stateMachine = stateMachine;
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.instances = instances;
+ this.internalServiceClientManager = internalServiceClientManager;
}
public abstract void start();
public abstract void abort();
- protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
- // TODO (jackie tien) change the port
- InternalService.Iface client =
- InternalServiceClientFactory.getInternalServiceClient(
- new TEndPoint(
- instance.getHostEndpoint().getIp(),
- IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
- TFragmentInstanceStateResp resp =
- client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
- return FragmentInstanceState.valueOf(resp.state);
+ protected FragmentInstanceState fetchState(FragmentInstance instance)
+ throws TException, IOException {
+ SyncDataNodeInternalServiceClient client = null;
+ try {
+ // TODO: (jackie tien) change the port
+ TEndPoint endPoint =
+ new TEndPoint(
+ instance.getHostEndpoint().getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getInternalPort());
+ client = internalServiceClientManager.borrowClient(endPoint);
+ TFragmentInstanceStateResp resp =
+ client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
+ return FragmentInstanceState.valueOf(resp.state);
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
+ }
}
private TFragmentInstanceId getTId(FragmentInstance instance) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 18c9b47367..1ec7398d23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.db.mpp.execution.scheduler;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -67,18 +70,21 @@ public class ClusterScheduler implements IScheduler {
List<FragmentInstance> instances,
QueryType queryType,
ExecutorService executor,
- ScheduledExecutorService scheduledExecutor) {
+ ScheduledExecutorService scheduledExecutor,
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
this.queryContext = queryContext;
this.stateMachine = stateMachine;
this.instances = instances;
this.queryType = queryType;
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
- this.dispatcher = new SimpleFragInstanceDispatcher(executor);
+ this.dispatcher = new SimpleFragInstanceDispatcher(executor, internalServiceClientManager);
this.stateTracker =
- new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
+ new FixedRateFragInsStateTracker(
+ stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
this.queryTerminator =
- new SimpleQueryTerminator(executor, queryContext.getQueryId(), instances);
+ new SimpleQueryTerminator(
+ executor, queryContext.getQueryId(), instances, internalServiceClientManager);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
index b9336f350e..4608b45e4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
@@ -27,6 +30,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -45,8 +49,9 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
QueryStateMachine stateMachine,
ExecutorService executor,
ScheduledExecutorService scheduledExecutor,
- List<FragmentInstance> instances) {
- super(stateMachine, executor, scheduledExecutor, instances);
+ List<FragmentInstance> instances,
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+ super(stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
}
@Override
@@ -72,7 +77,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
if (state != null) {
stateMachine.updateFragInstanceState(instance.getId(), state);
}
- } catch (TException e) {
+ } catch (TException | IOException e) {
// TODO: do nothing ?
logger.error("error happened while fetching query state", e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
deleted file mode 100644
index aa35c3854b..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.execution.scheduler;
-
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class InternalServiceClientFactory {
- private static final int TIMEOUT_MS = 10_000;
-
- private static final Logger logger = LoggerFactory.getLogger(InternalServiceClientFactory.class);
-
- // TODO need to be replaced by mature client pool in the future
- private static final Map<TEndPoint, InternalService.Iface> internalServiceClientMap =
- new ConcurrentHashMap<>();
-
- public static InternalService.Iface getInternalServiceClient(TEndPoint endpoint)
- throws TTransportException {
- return internalServiceClientMap.computeIfAbsent(
- endpoint,
- address -> {
- TTransport transport;
- try {
- transport =
- RpcTransportFactory.INSTANCE.getTransport(
- // as there is a try-catch already, we do not need to use TSocket.wrap
- address.getIp(), address.getPort(), TIMEOUT_MS);
- transport.open();
-
- } catch (TTransportException e) {
- logger.error(
- "error happened while creating mpp service client for {}:{}",
- endpoint.getIp(),
- endpoint.getPort(),
- e);
- throw new RuntimeException(e);
- }
- TProtocol protocol = new TBinaryProtocol(transport);
- return newSynchronizedClient(new InternalService.Client(protocol));
- });
- }
-
- public static InternalService.Iface newSynchronizedClient(InternalService.Iface client) {
- return (InternalService.Iface)
- Proxy.newProxyInstance(
- InternalServiceClientFactory.class.getClassLoader(),
- new Class[] {InternalService.Iface.class},
- new SynchronizedHandler(client));
- }
-
- private static class SynchronizedHandler implements InvocationHandler {
-
- private final InternalService.Iface client;
-
- public SynchronizedHandler(InternalService.Iface client) {
- this.client = client;
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- try {
- synchronized (client) {
- return method.invoke(client, args);
- }
- } catch (InvocationTargetException e) {
- // all IFace APIs throw TException
- if (e.getTargetException() instanceof TException) {
- throw e.getTargetException();
- } else {
- // should not happen
- throw new TException(
- "Error in calling method " + method.getName(), e.getTargetException());
- }
- } catch (Exception e) {
- throw new TException("Error in calling method " + method.getName(), e);
- }
- }
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index b2e9252f93..e70addd9f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
@@ -40,8 +41,14 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFragInstanceDispatcher.class);
private final ExecutorService executor;
- public SimpleFragInstanceDispatcher(ExecutorService exeutor) {
- this.executor = exeutor;
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ internalServiceClientManager;
+
+ public SimpleFragInstanceDispatcher(
+ ExecutorService executor,
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+ this.executor = executor;
+ this.internalServiceClientManager = internalServiceClientManager;
}
@Override
@@ -50,21 +57,28 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
() -> {
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
for (FragmentInstance instance : instances) {
- // TODO: (jackie tien) change the port
- InternalService.Iface client =
- InternalServiceClientFactory.getInternalServiceClient(
- new TEndPoint(
- instance.getHostEndpoint().getIp(),
- IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
- // TODO: (xingtanzjr) consider how to handle the buffer here
- ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
- instance.serializeRequest(buffer);
- buffer.flip();
- TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
- TSendFragmentInstanceReq req =
- new TSendFragmentInstanceReq(
- new TFragmentInstance(buffer), groupId, instance.getType().toString());
- resp = client.sendFragmentInstance(req);
+ SyncDataNodeInternalServiceClient client = null;
+ try {
+ // TODO: (jackie tien) change the port
+ client =
+ internalServiceClientManager.borrowClient(
+ new TEndPoint(
+ instance.getHostEndpoint().getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+ // TODO: (xingtanzjr) consider how to handle the buffer here
+ ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+ instance.serializeRequest(buffer);
+ buffer.flip();
+ TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
+ TSendFragmentInstanceReq req =
+ new TSendFragmentInstanceReq(
+ new TFragmentInstance(buffer), groupId, instance.getType().toString());
+ resp = client.sendFragmentInstance(req);
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
+ }
if (!resp.accepted) {
break;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index 856d8d6e6d..abcaf1bfe9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -20,10 +20,11 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.thrift.TException;
@@ -41,11 +42,18 @@ public class SimpleQueryTerminator implements IQueryTerminator {
private final QueryId queryId;
private final List<FragmentInstance> fragmentInstances;
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ internalServiceClientManager;
+
public SimpleQueryTerminator(
- ExecutorService executor, QueryId queryId, List<FragmentInstance> fragmentInstances) {
+ ExecutorService executor,
+ QueryId queryId,
+ List<FragmentInstance> fragmentInstances,
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
this.executor = executor;
this.queryId = queryId;
this.fragmentInstances = fragmentInstances;
+ this.internalServiceClientManager = internalServiceClientManager;
}
@Override
@@ -57,12 +65,19 @@ public class SimpleQueryTerminator implements IQueryTerminator {
try {
for (TEndPoint endpoint : relatedHost) {
// TODO (jackie tien) change the port
- InternalService.Iface client =
- InternalServiceClientFactory.getInternalServiceClient(
- new TEndPoint(
- endpoint.getIp(),
- IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
- client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+ SyncDataNodeInternalServiceClient client = null;
+ try {
+ client =
+ internalServiceClientManager.borrowClient(
+ new TEndPoint(
+ endpoint.getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+ client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
+ }
}
} catch (TException e) {
return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index a1c31c99ab..8589f535d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.db.mpp.execution.scheduler;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -64,16 +67,19 @@ public class StandaloneScheduler implements IScheduler {
List<FragmentInstance> instances,
QueryType queryType,
ExecutorService executor,
- ScheduledExecutorService scheduledExecutor) {
+ ScheduledExecutorService scheduledExecutor,
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
this.queryContext = queryContext;
this.instances = instances;
this.queryType = queryType;
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.stateTracker =
- new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
+ new FixedRateFragInsStateTracker(
+ stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
this.queryTerminator =
- new SimpleQueryTerminator(executor, queryContext.getQueryId(), instances);
+ new SimpleQueryTerminator(
+ executor, queryContext.getQueryId(), instances, internalServiceClientManager);
}
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index d45eae5db9..8ac22daac7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -20,11 +20,12 @@
package org.apache.iotdb.db.mpp.buffer;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.memory.MemoryPool;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService.Client;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
@@ -58,16 +59,20 @@ public class SinkHandleTest {
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+ IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+ Mockito.mock(IClientManager.class);
// Construct a mock client.
- Client mockClient = Mockito.mock(Client.class);
+ SyncDataNodeDataBlockServiceClient mockClient =
+ Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
try {
+ Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doNothing()
.when(mockClient)
.onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
Mockito.doNothing()
.when(mockClient)
.onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
- } catch (TException e) {
+ } catch (TException | IOException e) {
e.printStackTrace();
Assert.fail();
}
@@ -85,9 +90,9 @@ public class SinkHandleTest {
localFragmentInstanceId,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
- mockClient,
Utils.createMockTsBlockSerde(mockTsBlockSize),
- mockSinkHandleListener);
+ mockSinkHandleListener,
+ mockClientManager);
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isClosed());
@@ -185,16 +190,20 @@ public class SinkHandleTest {
SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
// Construct several mock TsBlock(s).
List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
+ IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+ Mockito.mock(IClientManager.class);
// Construct a mock client.
- Client mockClient = Mockito.mock(Client.class);
+ SyncDataNodeDataBlockServiceClient mockClient =
+ Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
try {
+ Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doNothing()
.when(mockClient)
.onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
Mockito.doNothing()
.when(mockClient)
.onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
- } catch (TException e) {
+ } catch (TException | IOException e) {
e.printStackTrace();
Assert.fail();
}
@@ -208,9 +217,9 @@ public class SinkHandleTest {
localFragmentInstanceId,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
- mockClient,
Utils.createMockTsBlockSerde(mockTsBlockSize),
- mockSinkHandleListener);
+ mockSinkHandleListener,
+ mockClientManager);
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isClosed());
@@ -353,17 +362,21 @@ public class SinkHandleTest {
SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
// Construct several mock TsBlock(s).
List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
+ IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+ Mockito.mock(IClientManager.class);
// Construct a mock client.
- Client mockClient = Mockito.mock(Client.class);
+ SyncDataNodeDataBlockServiceClient mockClient =
+ Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
TException mockException = new TException("Mock exception");
try {
+ Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doThrow(mockException)
.when(mockClient)
.onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
Mockito.doThrow(mockException)
.when(mockClient)
.onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
- } catch (TException e) {
+ } catch (TException | IOException e) {
e.printStackTrace();
Assert.fail();
}
@@ -377,9 +390,9 @@ public class SinkHandleTest {
localFragmentInstanceId,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
- mockClient,
Utils.createMockTsBlockSerde(mockTsBlockSize),
- mockSinkHandleListener);
+ mockSinkHandleListener,
+ mockClientManager);
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isClosed());
@@ -454,16 +467,20 @@ public class SinkHandleTest {
SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
// Construct several mock TsBlock(s).
List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
+ IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+ Mockito.mock(IClientManager.class);
// Construct a mock client.
- Client mockClient = Mockito.mock(Client.class);
+ SyncDataNodeDataBlockServiceClient mockClient =
+ Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
try {
+ Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doNothing()
.when(mockClient)
.onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
Mockito.doNothing()
.when(mockClient)
.onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
- } catch (TException e) {
+ } catch (TException | IOException e) {
e.printStackTrace();
Assert.fail();
}
@@ -477,9 +494,9 @@ public class SinkHandleTest {
localFragmentInstanceId,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
- mockClient,
Utils.createMockTsBlockSerde(mockTsBlockSize),
- mockSinkHandleListener);
+ mockSinkHandleListener,
+ mockClientManager);
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isClosed());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index 7f53a2f4ab..87d76463db 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -20,11 +20,12 @@
package org.apache.iotdb.db.mpp.buffer;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.memory.MemoryPool;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService.Client;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
@@ -36,6 +37,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -45,7 +47,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
public class SourceHandleTest {
-
@Test
public void testNonBlockedOneTimeReceive() {
final String queryId = "q0";
@@ -62,9 +63,13 @@ public class SourceHandleTest {
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+ IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+ Mockito.mock(IClientManager.class);
// Construct a mock client.
- Client mockClient = Mockito.mock(Client.class);
+ SyncDataNodeDataBlockServiceClient mockClient =
+ Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
try {
+ Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doAnswer(
invocation -> {
TGetDataBlockRequest req = invocation.getArgument(0);
@@ -77,7 +82,7 @@ public class SourceHandleTest {
})
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
- } catch (TException e) {
+ } catch (TException | IOException e) {
e.printStackTrace();
Assert.fail();
}
@@ -94,9 +99,9 @@ public class SourceHandleTest {
localPlanNodeId,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
- mockClient,
mockTsBlockSerde,
- mockSourceHandleListener);
+ mockSourceHandleListener,
+ mockClientManager);
Assert.assertFalse(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isClosed());
Assert.assertFalse(sourceHandle.isFinished());
@@ -181,9 +186,13 @@ public class SourceHandleTest {
MemoryPool spyMemoryPool =
Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
+ IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+ Mockito.mock(IClientManager.class);
// Construct a mock client.
- Client mockClient = Mockito.mock(Client.class);
+ SyncDataNodeDataBlockServiceClient mockClient =
+ Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
try {
+ Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doAnswer(
invocation -> {
TGetDataBlockRequest req = invocation.getArgument(0);
@@ -196,7 +205,7 @@ public class SourceHandleTest {
})
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
- } catch (TException e) {
+ } catch (TException | IOException e) {
e.printStackTrace();
Assert.fail();
}
@@ -213,9 +222,9 @@ public class SourceHandleTest {
localPlanNodeId,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
- mockClient,
mockTsBlockSerde,
- mockSourceHandleListener);
+ mockSourceHandleListener,
+ mockClientManager);
Assert.assertFalse(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isClosed());
Assert.assertFalse(sourceHandle.isFinished());
@@ -326,9 +335,13 @@ public class SourceHandleTest {
SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
// Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(mockTsBlockSize);
+ IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+ Mockito.mock(IClientManager.class);
// Construct a mock client.
- Client mockClient = Mockito.mock(Client.class);
+ SyncDataNodeDataBlockServiceClient mockClient =
+ Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
try {
+ Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doAnswer(
invocation -> {
TGetDataBlockRequest req = invocation.getArgument(0);
@@ -341,7 +354,7 @@ public class SourceHandleTest {
})
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
- } catch (TException e) {
+ } catch (TException | IOException e) {
e.printStackTrace();
Assert.fail();
}
@@ -354,9 +367,9 @@ public class SourceHandleTest {
localPlanNodeId,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
- mockClient,
mockTsBlockSerde,
- mockSourceHandleListener);
+ mockSourceHandleListener,
+ mockClientManager);
Assert.assertFalse(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isClosed());
Assert.assertFalse(sourceHandle.isFinished());
@@ -509,14 +522,18 @@ public class SourceHandleTest {
SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
// Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(mockTsBlockSize);
+ IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+ Mockito.mock(IClientManager.class);
// Construct a mock client.
- Client mockClient = Mockito.mock(Client.class);
+ SyncDataNodeDataBlockServiceClient mockClient =
+ Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
TException mockException = new TException("Mock exception");
try {
+ Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doThrow(mockException)
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
- } catch (TException e) {
+ } catch (TException | IOException e) {
e.printStackTrace();
Assert.fail();
}
@@ -529,9 +546,9 @@ public class SourceHandleTest {
localPlanNodeId,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
- mockClient,
mockTsBlockSerde,
- mockSourceHandleListener);
+ mockSourceHandleListener,
+ mockClientManager);
Future<?> blocked = sourceHandle.isBlocked();
Assert.assertFalse(blocked.isDone());
Assert.assertFalse(sourceHandle.isClosed());
@@ -579,9 +596,13 @@ public class SourceHandleTest {
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+ IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+ Mockito.mock(IClientManager.class);
// Construct a mock client.
- Client mockClient = Mockito.mock(Client.class);
+ SyncDataNodeDataBlockServiceClient mockClient =
+ Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
try {
+ Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
Mockito.doAnswer(
invocation -> {
TGetDataBlockRequest req = invocation.getArgument(0);
@@ -594,7 +615,7 @@ public class SourceHandleTest {
})
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
- } catch (TException e) {
+ } catch (TException | IOException e) {
e.printStackTrace();
Assert.fail();
}
@@ -611,9 +632,9 @@ public class SourceHandleTest {
localPlanNodeId,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
- mockClient,
mockTsBlockSerde,
- mockSourceHandleListener);
+ mockSourceHandleListener,
+ mockClientManager);
Future<?> blocked = sourceHandle.isBlocked();
Assert.assertFalse(blocked.isDone());
Assert.assertFalse(blocked.isCancelled());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index f10596f775..44e75fbbd7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -20,7 +20,10 @@
package org.apache.iotdb.db.mpp.sql.plan;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -32,6 +35,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
@@ -39,6 +44,22 @@ import java.time.ZoneId;
public class QueryPlannerTest {
+ private static IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ internalServiceClientManager;
+
+ @BeforeClass
+ public static void setUp() {
+ internalServiceClientManager =
+ new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+ }
+
+ @AfterClass
+ public static void destroy() {
+ internalServiceClientManager.close();
+ }
+
@Ignore
@Test
public void TestSqlToDistributedPlan() {
@@ -55,7 +76,8 @@ public class QueryPlannerTest {
IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
new FakePartitionFetcherImpl(),
- new FakeSchemaFetcherImpl());
+ new FakeSchemaFetcherImpl(),
+ internalServiceClientManager);
queryExecution.doLogicalPlan();
System.out.printf("SQL: %s%n%n", querySql);
System.out.println("===== Step 1: Logical Plan =====");