You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/17 05:16:27 UTC
[iotdb] branch master updated: Make standalone mode mpp basically runnable (#5566)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2691ade495 Make standalone mode mpp basically runnable (#5566)
2691ade495 is described below
commit 2691ade495513b54f00810c6cdd97c08326293e5
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Sun Apr 17 13:16:20 2022 +0800
Make standalone mode mpp basically runnable (#5566)
---
.../iotdb/commons/concurrent/ThreadName.java | 11 +--
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 19 -----
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 --
.../iotdb/db/mpp/buffer/DataBlockManager.java | 36 ++++----
.../iotdb/db/mpp/buffer/DataBlockService.java | 4 +-
.../mpp/buffer/DataBlockServiceClientFactory.java | 95 ++++++++++++++++++----
.../iotdb/db/mpp/buffer/IDataBlockManager.java | 31 +++----
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 7 +-
.../apache/iotdb/db/mpp/buffer/SourceHandle.java | 6 +-
.../apache/iotdb/db/mpp/buffer/StubSinkHandle.java | 11 +++
.../apache/iotdb/db/mpp/execution/Coordinator.java | 4 +-
.../apache/iotdb/db/mpp/execution/DataDriver.java | 1 -
.../iotdb/db/mpp/execution/DriverContext.java | 4 +
.../db/mpp/execution/FragmentInstanceContext.java | 7 ++
.../db/mpp/execution/FragmentInstanceManager.java | 2 +-
.../iotdb/db/mpp/execution/QueryExecution.java | 24 +++---
.../iotdb/db/mpp/execution/QueryStateMachine.java | 6 +-
.../iotdb/db/mpp/execution/SchemaDriver.java | 21 ++++-
.../scheduler/AbstractFragInsStateTracker.java | 10 ++-
.../scheduler/FixedRateFragInsStateTracker.java | 8 ++
.../scheduler/InternalServiceClientFactory.java | 89 +++++++++++++++++---
.../scheduler/SimpleFragInstanceDispatcher.java | 10 ++-
.../execution/scheduler/SimpleQueryTerminator.java | 9 +-
.../db/mpp/operator/process/TimeJoinOperator.java | 5 +-
.../db/mpp/schedule/FragmentInstanceScheduler.java | 17 +---
.../db/mpp/sql/analyze/ClusterSchemaFetcher.java | 2 +-
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 46 +++++------
.../iotdb/db/service/DataNodeManagementServer.java | 4 +-
.../apache/iotdb/db/service/InternalService.java | 4 +-
.../iotdb/db/service/InternalServiceImpl.java | 6 +-
.../thrift/impl/DataNodeTSIServiceImpl.java | 2 +-
.../iotdb/db/mpp/execution/DataDriverTest.java | 4 +-
.../db/mpp/schedule/DefaultTaskSchedulerTest.java | 8 --
33 files changed, 330 insertions(+), 189 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e587d38730..33b99446aa 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -74,12 +74,13 @@ public enum ThreadName {
CLUSTER_MONITOR("ClusterMonitor"),
CONFIG_NODE_RPC_SERVER("ConfigNodeRpcServer"),
CONFIG_NODE_RPC_CLIENT("ConfigNodeRPC-Client"),
- DATA_NODE_MANAGEMENT_SERVER("DataNodeManagementServer"),
- DATA_NODE_MANAGEMENT_CLIENT("DataNodeManagementClient"),
+ DATA_NODE_MANAGEMENT_RPC_SERVER("DataNodeManagementRPC"),
+ DATA_NODE_MANAGEMENT_RPC_CLIENT("DataNodeManagementRPC-Client"),
Cluster_Monitor("ClusterMonitor"),
- DATA_BLOCK_MANAGER_SERVICE("DataBlockManagerService"),
- DATA_BLOCK_MANAGER_CLIENT("DataBlockManagerService-Client"),
- INTERNAL_SERVICE_CLIENT("InternalService-Client"),
+ DATA_BLOCK_MANAGER_RPC_SERVER("DataBlockManagerRPC"),
+ DATA_BLOCK_MANAGER_RPC_CLIENT("DataBlockManagerRPC-Client"),
+ INTERNAL_SERVICE_RPC_SERVER("InternalServiceRPC"),
+ INTERNAL_SERVICE_RPC_CLIENT("InternalServiceRPC-Client"),
;
private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 856381c0d9..9540120af5 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -303,17 +303,6 @@ public class IoTDBConfig {
/** How many threads can concurrently build index. When <= 0, use CPU core number. */
private int concurrentIndexBuildThread = Runtime.getRuntime().availableProcessors();
- /**
- * If we enable the memory-control mechanism during index building , {@code indexBufferSize}
- * refers to the byte-size of memory buffer threshold. For each index processor, all indexes in
- * one {@linkplain org.apache.iotdb.db.index.IndexFileProcessor IndexFileProcessor} share a total
- * common buffer size. With the memory-control mechanism, the occupied memory of all raw data and
- * index structures will be counted. If the memory buffer size reaches this threshold, the indexes
- * will be flushed to the disk file. As a result, data in one series may be divided into more than
- * one part and indexed separately. Unit: byte
- */
- private long indexBufferSize = 128 * 1024 * 1024L;
-
/**
* the index framework adopts sliding window model to preprocess the original tv list in the
* subsequence matching task.
@@ -2330,14 +2319,6 @@ public class IoTDBConfig {
return concurrentIndexBuildThread;
}
- public long getIndexBufferSize() {
- return indexBufferSize;
- }
-
- public void setIndexBufferSize(long indexBufferSize) {
- this.indexBufferSize = indexBufferSize;
- }
-
public String getIndexRootFolder() {
return indexRootFolder;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 189d65a85d..c11b14ccab 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -465,12 +465,6 @@ public class IoTDBDescriptor {
"default_index_window_range",
Integer.toString(conf.getDefaultIndexWindowRange()))));
- conf.setIndexBufferSize(
- Long.parseLong(
- properties.getProperty(
- "index_buffer_size", Long.toString(conf.getIndexBufferSize()))));
- // end: index parameter setting
-
conf.setConcurrentQueryThread(
Integer.parseInt(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index f90fce1f3e..a5278bdfd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.buffer;
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.AcknowledgeDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
@@ -191,6 +193,12 @@ public class DataBlockManager implements IDataBlockManager {
/** Listen to the state changes of a sink handle. */
class SinkHandleListenerImpl implements SinkHandleListener {
+ private final FragmentInstanceContext context;
+
+ public SinkHandleListenerImpl(FragmentInstanceContext context) {
+ this.context = context;
+ }
+
@Override
public void onFinish(SinkHandle sinkHandle) {
logger.info("Release resources of finished sink handle {}", sourceHandles);
@@ -198,10 +206,13 @@ public class DataBlockManager implements IDataBlockManager {
logger.info("Resources of finished sink handle {} has already been released", sinkHandle);
}
sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
+ context.finish();
}
@Override
- public void onClosed(SinkHandle sinkHandle) {}
+ public void onClosed(SinkHandle sinkHandle) {
+ context.flushing();
+ }
@Override
public void onAborted(SinkHandle sinkHandle) {
@@ -245,11 +256,10 @@ public class DataBlockManager implements IDataBlockManager {
@Override
public ISinkHandle createSinkHandle(
TFragmentInstanceId localFragmentInstanceId,
- String remoteHostname,
- int remotePort,
+ Endpoint endpoint,
TFragmentInstanceId remoteFragmentInstanceId,
- String remotePlanNodeId)
- throws IOException {
+ String remotePlanNodeId,
+ FragmentInstanceContext instanceContext) {
if (sinkHandles.containsKey(localFragmentInstanceId)) {
throw new IllegalStateException("Sink handle for " + localFragmentInstanceId + " exists.");
}
@@ -262,15 +272,15 @@ public class DataBlockManager implements IDataBlockManager {
SinkHandle sinkHandle =
new SinkHandle(
- remoteHostname,
+ endpoint.toString(),
remoteFragmentInstanceId,
remotePlanNodeId,
localFragmentInstanceId,
localMemoryManager,
executorService,
- clientFactory.getDataBlockServiceClient(remoteHostname, remotePort),
+ clientFactory.getDataBlockServiceClient(endpoint),
tsBlockSerdeFactory.get(),
- new SinkHandleListenerImpl());
+ new SinkHandleListenerImpl(instanceContext));
sinkHandles.put(localFragmentInstanceId, sinkHandle);
return sinkHandle;
}
@@ -279,10 +289,8 @@ public class DataBlockManager implements IDataBlockManager {
public ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
- String remoteHostname,
- int remotePort,
- TFragmentInstanceId remoteFragmentInstanceId)
- throws IOException {
+ Endpoint endpoint,
+ TFragmentInstanceId remoteFragmentInstanceId) {
if (sourceHandles.containsKey(localFragmentInstanceId)
&& sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
throw new IllegalStateException(
@@ -301,13 +309,13 @@ public class DataBlockManager implements IDataBlockManager {
SourceHandle sourceHandle =
new SourceHandle(
- remoteHostname,
+ endpoint.getIp(),
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
localMemoryManager,
executorService,
- clientFactory.getDataBlockServiceClient(remoteHostname, remotePort),
+ clientFactory.getDataBlockServiceClient(endpoint),
tsBlockSerdeFactory.get(),
new SourceHandleListenerImpl());
sourceHandles
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
index fa9b30cd8a..cabb725b95 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
@@ -84,7 +84,7 @@ public class DataBlockService extends ThriftService implements DataBlockServiceM
new ThriftServiceThread(
processor,
getID().getName(),
- ThreadName.DATA_BLOCK_MANAGER_CLIENT.getName(),
+ ThreadName.DATA_BLOCK_MANAGER_RPC_CLIENT.getName(),
getBindIP(),
getBindPort(),
config.getRpcMaxConcurrentClientNum(),
@@ -95,7 +95,7 @@ public class DataBlockService extends ThriftService implements DataBlockServiceM
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
- thriftServiceThread.setName(ThreadName.DATA_BLOCK_MANAGER_SERVICE.getName());
+ thriftServiceThread.setName(ThreadName.DATA_BLOCK_MANAGER_RPC_SERVER.getName());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
index f289662d80..397755cd4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.mpp.buffer;
+import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService.Client;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.thrift.TException;
@@ -29,22 +29,89 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class DataBlockServiceClientFactory {
- public DataBlockService.Client getDataBlockServiceClient(String hostname, int port)
- throws IOException {
- try {
- TTransport transport = RpcTransportFactory.INSTANCE.getTransportWithNoTimeout(hostname, port);
- transport.open();
- TProtocol protocol =
- IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()
- ? new TCompactProtocol(transport)
- : new TBinaryProtocol(transport);
- return new Client(protocol);
- } catch (TException e) {
- throw new IOException(e);
+
+ private static final int TIMEOUT_MS = 30_000;
+
+ private static final Logger logger = LoggerFactory.getLogger(DataBlockServiceClientFactory.class);
+
+ // TODO need to be replaced by mature client pool in the future
+ private static final Map<Endpoint, DataBlockService.Iface> dataBlockServiceClientMap =
+ new ConcurrentHashMap<>();
+
+ public DataBlockService.Iface getDataBlockServiceClient(Endpoint endpoint) {
+
+ return dataBlockServiceClientMap.computeIfAbsent(
+ endpoint,
+ address -> {
+ TTransport transport;
+ try {
+ transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ // as there is a try-catch already, we do not need to use TSocket.wrap
+ address.getIp(), address.getPort(), TIMEOUT_MS);
+ transport.open();
+ TProtocol protocol =
+ IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()
+ ? new TCompactProtocol(transport)
+ : new TBinaryProtocol(transport);
+ return newSynchronizedClient(new DataBlockService.Client(protocol));
+ } catch (TTransportException e) {
+ logger.error(
+ "error happened while creating mpp service client for {}:{}",
+ endpoint.getIp(),
+ endpoint.getPort(),
+ e);
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public static DataBlockService.Iface newSynchronizedClient(DataBlockService.Iface client) {
+ return (DataBlockService.Iface)
+ Proxy.newProxyInstance(
+ DataBlockServiceClientFactory.class.getClassLoader(),
+ new Class[] {DataBlockService.Iface.class},
+ new SynchronizedHandler(client));
+ }
+
+ private static class SynchronizedHandler implements InvocationHandler {
+
+ private final DataBlockService.Iface client;
+
+ public SynchronizedHandler(DataBlockService.Iface client) {
+ this.client = client;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ try {
+ synchronized (client) {
+ return method.invoke(client, args);
+ }
+ } catch (InvocationTargetException e) {
+ // all IFace APIs throw TException
+ if (e.getTargetException() instanceof TException) {
+ throw e.getTargetException();
+ } else {
+ // should not happen
+ throw new TException(
+ "Error in calling method " + method.getName(), e.getTargetException());
+ }
+ } catch (Exception e) {
+ throw new TException("Error in calling method " + method.getName(), e);
+ }
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
index e69a4a6b9c..0eff0c6620 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
@@ -19,12 +19,10 @@
package org.apache.iotdb.db.mpp.buffer;
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
-import org.apache.thrift.transport.TTransportException;
-
-import java.io.IOException;
-
public interface IDataBlockManager {
/**
* Create a sink handle who sends data blocks to a remote downstream fragment instance in async
@@ -32,19 +30,16 @@ public interface IDataBlockManager {
*
* @param localFragmentInstanceId ID of the local fragment instance who generates and sends data
* blocks to the sink handle.
- * @param remoteHostname Hostname of the remote fragment instance where the data blocks should be
- * sent to.
- * @param remotePort Port of the remote fragment instance where the data blocks should be sent to.
- * @param remoteFragmentInstanceId ID of the remote fragment instance.
+ * @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
+ * be sent to.
* @param remotePlanNodeId The sink plan node ID of the remote fragment instance.
*/
ISinkHandle createSinkHandle(
TFragmentInstanceId localFragmentInstanceId,
- String remoteHostname,
- int remotePort,
+ Endpoint endpoint,
TFragmentInstanceId remoteFragmentInstanceId,
- String remotePlanNodeId)
- throws TTransportException, IOException;
+ String remotePlanNodeId,
+ FragmentInstanceContext instanceContext);
/**
* Create a source handle who fetches data blocks from a remote upstream fragment instance for a
@@ -53,19 +48,15 @@ public interface IDataBlockManager {
* @param localFragmentInstanceId ID of the local fragment instance who receives data blocks from
* the source handle.
* @param localPlanNodeId The local sink plan node ID.
- * @param remoteHostname Hostname of the remote fragment instance where the data blocks should be
- * received from.
- * @param remotePort Port of the remote fragment instance where the data blocks should be received
- * from.
+ * @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
+ * be received from.
* @param remoteFragmentInstanceId ID of the remote fragment instance.
*/
ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
- String remoteHostname,
- int remotePort,
- TFragmentInstanceId remoteFragmentInstanceId)
- throws IOException;
+ Endpoint endpoint,
+ TFragmentInstanceId remoteFragmentInstanceId);
/**
* Release all the related resources of a fragment instance, including data blocks that are not
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index 2934a6bf7a..0beab26e36 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -59,7 +59,7 @@ public class SinkHandle implements ISinkHandle {
private final TFragmentInstanceId localFragmentInstanceId;
private final LocalMemoryManager localMemoryManager;
private final ExecutorService executorService;
- private final DataBlockService.Client client;
+ private final DataBlockService.Iface client;
private final TsBlockSerde serde;
private final SinkHandleListener sinkHandleListener;
@@ -82,7 +82,7 @@ public class SinkHandle implements ISinkHandle {
TFragmentInstanceId localFragmentInstanceId,
LocalMemoryManager localMemoryManager,
ExecutorService executorService,
- DataBlockService.Client client,
+ DataBlockService.Iface client,
TsBlockSerde serde,
SinkHandleListener sinkHandleListener) {
this.remoteHostname = Validate.notNull(remoteHostname);
@@ -359,7 +359,8 @@ public class SinkHandle implements ISinkHandle {
remotePlanNodeId,
remoteFragmentInstanceId,
e.getMessage(),
- attempt);
+ attempt,
+ e);
if (attempt == MAX_ATTEMPT_TIMES) {
synchronized (this) {
throwable = e;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index b577174702..802634efa7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -59,7 +59,7 @@ public class SourceHandle implements ISourceHandle {
private final String localPlanNodeId;
private final LocalMemoryManager localMemoryManager;
private final ExecutorService executorService;
- private final DataBlockService.Client client;
+ private final DataBlockService.Iface client;
private final TsBlockSerde serde;
private final SourceHandleListener sourceHandleListener;
@@ -83,7 +83,7 @@ public class SourceHandle implements ISourceHandle {
String localPlanNodeId,
LocalMemoryManager localMemoryManager,
ExecutorService executorService,
- DataBlockService.Client client,
+ DataBlockService.Iface client,
TsBlockSerde serde,
SourceHandleListener sourceHandleListener) {
this.remoteHostname = Validate.notNull(remoteHostname);
@@ -242,7 +242,7 @@ public class SourceHandle implements ISourceHandle {
return throwable == null
&& noMoreTsBlocks
&& numActiveGetDataBlocksTask == 0
- && nextSequenceId - 1 == lastSequenceId
+ && currSequenceId - 1 == lastSequenceId
&& sequenceIdToTsBlock.isEmpty();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index 7753bc3ff1..9f3b9240c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.buffer;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
@@ -34,8 +35,14 @@ public class StubSinkHandle implements ISinkHandle {
private final List<TsBlock> tsBlocks = new ArrayList<>();
+ private final FragmentInstanceContext instanceContext;
+
private boolean closed = false;
+ public StubSinkHandle(FragmentInstanceContext instanceContext) {
+ this.instanceContext = instanceContext;
+ }
+
@Override
public long getBufferRetainedSizeInBytes() {
return 0;
@@ -76,7 +83,11 @@ public class StubSinkHandle implements ISinkHandle {
@Override
public void close() {
+ if (closed) {
+ return;
+ }
closed = true;
+ instanceContext.flushing();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index d92368cee4..51f3a6d6f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -44,9 +44,9 @@ import java.util.concurrent.ScheduledExecutorService;
*/
public class Coordinator {
private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
- private static final int COORDINATOR_EXECUTOR_SIZE = 10;
+ private static final int COORDINATOR_EXECUTOR_SIZE = 2;
private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
- private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
+ private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 2;
private static final Endpoint LOCAL_HOST =
new Endpoint(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 88c160dd28..785d8d8db3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -91,7 +91,6 @@ public class DataDriver implements Driver {
closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished());
if (isFinished) {
close();
- driverContext.finish();
}
return isFinished;
} catch (Throwable t) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index 2168c24f4e..8c20a2c334 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -43,4 +43,8 @@ public class DriverContext {
public void finish() {
fragmentInstanceContext.finish();
}
+
+ public void flushing() {
+ fragmentInstanceContext.flushing();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index 0884eb6f9c..c8f3e3ebd8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -114,6 +114,13 @@ public class FragmentInstanceContext extends QueryContext {
this.endTime = System.currentTimeMillis();
}
+ public void flushing() {
+ if (state.get().isDone()) {
+ return;
+ }
+ state.set(FragmentInstanceState.FLUSHING);
+ }
+
public long getEndTime() {
return endTime;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 1af0abfece..537f330f04 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -60,7 +60,7 @@ public class FragmentInstanceManager {
this.instanceContext = new ConcurrentHashMap<>();
this.instanceExecution = new ConcurrentHashMap<>();
this.instanceManagementExecutor =
- IoTDBThreadPoolFactory.newScheduledThreadPool(5, "instance-management");
+ IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
this.infoCacheTime = new Duration(15, TimeUnit.MINUTES);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index e7df772ad9..e7e4995995 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.buffer.DataBlockService;
import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
@@ -246,19 +247,16 @@ public class QueryExecution implements IQueryExecution {
private void initResultHandle() {
if (this.resultHandle == null) {
- try {
- this.resultHandle =
- DataBlockService.getInstance()
- .getDataBlockManager()
- .createSourceHandle(
- context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
- context.getResultNodeContext().getVirtualResultNodeId().getId(),
- context.getResultNodeContext().getUpStreamEndpoint().getIp(),
- IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
- context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
- } catch (IOException e) {
- stateMachine.transitionToFailed();
- }
+ this.resultHandle =
+ DataBlockService.getInstance()
+ .getDataBlockManager()
+ .createSourceHandle(
+ context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+ context.getResultNodeContext().getVirtualResultNodeId().getId(),
+ new Endpoint(
+ context.getResultNodeContext().getUpStreamEndpoint().getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+ context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 7bcded27dc..c5bc48ef42 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -33,9 +33,9 @@ import java.util.concurrent.ExecutorService;
* register listeners when the state changes of the QueryExecution.
*/
public class QueryStateMachine {
- private String name;
- private StateMachine<QueryState> queryState;
- private Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
+ private final String name;
+ private final StateMachine<QueryState> queryState;
+ private final Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
// The executor will be used in all the state machines belonged to this query.
private Executor stateMachineExecutor;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index ea49d012e3..47edf98973 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -50,6 +50,8 @@ public class SchemaDriver implements Driver {
private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
+ private boolean closed = false;
+
public SchemaDriver(Operator root, ISinkHandle sinkHandle, SchemaDriverContext driverContext) {
this.root = root;
this.sinkHandle = sinkHandle;
@@ -66,7 +68,6 @@ public class SchemaDriver implements Driver {
boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished();
if (isFinished) {
close();
- driverContext.finish();
}
return isFinished;
} catch (Throwable t) {
@@ -146,5 +147,21 @@ public class SchemaDriver implements Driver {
}
@Override
- public void close() {}
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+ if (root != null) {
+ root.close();
+ }
+ if (sinkHandle != null) {
+ sinkHandle.close();
+ }
+ } catch (Throwable t) {
+ logger.error("Failed to closed driver {}", driverContext.getId(), t);
+ driverContext.failed(t);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index b87579561c..0e59d80855 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
@@ -56,9 +58,11 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
public abstract void abort();
protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
- InternalService.Client client =
- InternalServiceClientFactory.getInternalServiceClient(
- instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
+ InternalService.Iface client =
+ InternalServiceClientFactory.getMppServiceClient(
+ new Endpoint(
+ instance.getHostEndpoint().getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getMppPort()));
TFragmentInstanceStateResp resp =
client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
return FragmentInstanceState.valueOf(resp.state);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
index 0fb90f160a..16cb7fcdda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
@@ -24,11 +24,16 @@ import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.*;
public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
+
+ private static final Logger logger = LoggerFactory.getLogger(FixedRateFragInsStateTracker.class);
+
// TODO: (xingtanzjr) consider how much Interval is OK for state tracker
private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
private ScheduledFuture<?> trackTask;
@@ -59,11 +64,14 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
for (FragmentInstance instance : instances) {
try {
FragmentInstanceState state = fetchState(instance);
+ logger.info("Instance {}'s State is {}", instance.getId(), state);
+
if (state != null) {
stateMachine.updateFragInstanceState(instance.getId(), state);
}
} catch (TException e) {
// TODO: do nothing ?
+ logger.error("error happened while fetching query state", e);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
index 4a5a94b4bf..ce60581900 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
@@ -19,26 +19,95 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
+import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.SynchronizedHandler;
+import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class InternalServiceClientFactory {
- private static final int TIMEOUT_MS = 10000;
+ private static final int TIMEOUT_MS = 10_000;
+
+ private static final Logger logger = LoggerFactory.getLogger(InternalServiceClientFactory.class);
+
+ // TODO need to be replaced by mature client pool in the future
+ private static final Map<Endpoint, InternalService.Iface> mppServiceClientMap =
+ new ConcurrentHashMap<>();
- // TODO: (xingtanzjr) consider the best practice to maintain the clients
- public static InternalService.Client getInternalServiceClient(String endpoint, int port)
+ public static InternalService.Iface getMppServiceClient(Endpoint endpoint)
throws TTransportException {
- TTransport transport =
- RpcTransportFactory.INSTANCE.getTransport(
- // as there is a try-catch already, we do not need to use TSocket.wrap
- endpoint, port, TIMEOUT_MS);
- transport.open();
- TProtocol protocol = new TBinaryProtocol(transport);
- return new InternalService.Client(protocol);
+ return mppServiceClientMap.computeIfAbsent(
+ endpoint,
+ address -> {
+ TTransport transport;
+ try {
+ transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ // as there is a try-catch already, we do not need to use TSocket.wrap
+ address.getIp(), address.getPort(), TIMEOUT_MS);
+ transport.open();
+
+ } catch (TTransportException e) {
+ logger.error(
+ "error happened while creating mpp service client for {}:{}",
+ endpoint.getIp(),
+ endpoint.getPort(),
+ e);
+ throw new RuntimeException(e);
+ }
+ TProtocol protocol = new TBinaryProtocol(transport);
+ return newSynchronizedClient(new InternalService.Client(protocol));
+ });
+ }
+
+ public static InternalService.Iface newSynchronizedClient(InternalService.Iface client) {
+ return (InternalService.Iface)
+ Proxy.newProxyInstance(
+ InternalServiceClientFactory.class.getClassLoader(),
+ new Class[] {InternalService.Iface.class},
+ new SynchronizedHandler(client));
+ }
+
+ private static class SynchronizedHandler implements InvocationHandler {
+
+ private final InternalService.Iface client;
+
+ public SynchronizedHandler(InternalService.Iface client) {
+ this.client = client;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ try {
+ synchronized (client) {
+ return method.invoke(client, args);
+ }
+ } catch (InvocationTargetException e) {
+ // all IFace APIs throw TException
+ if (e.getTargetException() instanceof TException) {
+ throw e.getTargetException();
+ } else {
+ // should not happen
+ throw new TException(
+ "Error in calling method " + method.getName(), e.getTargetException());
+ }
+ } catch (Exception e) {
+ throw new TException("Error in calling method " + method.getName(), e);
+ }
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index fa81d65764..bdece41164 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
+import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
@@ -47,10 +48,11 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
try {
for (FragmentInstance instance : instances) {
- InternalService.Client client =
- InternalServiceClientFactory.getInternalServiceClient(
- instance.getHostEndpoint().getIp(),
- IoTDBDescriptor.getInstance().getConfig().getMppPort());
+ InternalService.Iface client =
+ InternalServiceClientFactory.getMppServiceClient(
+ new Endpoint(
+ instance.getHostEndpoint().getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getMppPort()));
// TODO: (xingtanzjr) consider how to handle the buffer here
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
instance.serializeRequest(buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index 60d1e9c5a5..ca5e1f2230 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
@@ -54,9 +55,11 @@ public class SimpleQueryTerminator implements IQueryTerminator {
() -> {
try {
for (Endpoint endpoint : relatedHost) {
- InternalService.Client client =
- InternalServiceClientFactory.getInternalServiceClient(
- endpoint.getIp(), endpoint.getPort());
+ InternalService.Iface client =
+ InternalServiceClientFactory.getMppServiceClient(
+ new Endpoint(
+ endpoint.getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getMppPort()));
client.cancelQuery(new TCancelQueryReq(queryId.getId()));
}
} catch (TException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index f58af90e77..c5fbaf10b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -122,8 +122,9 @@ public class TimeJoinOperator implements ProcessOperator {
}
if (timeSelector.isEmpty()) {
- // TODO need to discuss whether to return null or return an empty TSBlock with TsBlockMetadata
- return null;
+ // return empty TsBlock
+ TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(0, dataTypes);
+ return tsBlockBuilder.build();
}
TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index b2200b7690..c7bdb95285 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -33,11 +33,8 @@ import org.apache.iotdb.db.mpp.schedule.queue.L2PriorityQueue;
import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskID;
import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,9 +66,8 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
private static final int MAX_CAPACITY = 1000; // TODO: load from config files
private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
- private static final int QUERY_TIMEOUT_MS = 10000; // TODO: load from config files or requests
+ private static final int QUERY_TIMEOUT_MS = 60_000; // TODO: load from config files or requests
private final ThreadGroup workerGroups;
- private InternalService.Client mppServiceClient; // TODO: use from client pool
private final List<AbstractExecutor> threads;
private FragmentInstanceScheduler() {
@@ -242,11 +238,6 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
this.blockManager = blockManager;
}
- @TestOnly
- void setMppServiceClient(InternalService.Client client) {
- this.mppServiceClient = client;
- }
-
private static class InstanceHolder {
private InstanceHolder() {}
@@ -348,12 +339,6 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
QueryId queryId = task.getId().getQueryId();
Set<FragmentInstanceTask> queryRelatedTasks = queryMap.remove(queryId);
if (queryRelatedTasks != null) {
- try {
- mppServiceClient.cancelQuery(new TCancelQueryReq(queryId.getId()));
- } catch (TException e) {
- // If coordinator cancel query failed, we should continue clean other tasks.
- logger.error("cancel query " + queryId.getId() + " failed", e);
- }
for (FragmentInstanceTask otherTask : queryRelatedTasks) {
if (task.equals(otherTask)) {
continue;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 783372eca6..3877706717 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -75,7 +75,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
coordinator.execute(schemaFetchStatement, queryId, null, "", partitionFetcher, this);
// TODO: (xingtanzjr) throw exception
if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new RuntimeException("cannot fetch schema");
+ throw new RuntimeException("cannot fetch schema, status is: " + executionResult.status);
}
TsBlock tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
// TODO: (xingtanzjr) need to release this query's resource here
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 166a3077a7..b0c277f65e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -68,7 +68,6 @@ import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -301,18 +300,15 @@ public class LocalExecutionPlanner {
FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId();
Endpoint source = node.getUpstreamEndpoint();
- try {
- ISourceHandle sourceHandle =
- DATA_BLOCK_MANAGER.createSourceHandle(
- localInstanceId.toThrift(),
- node.getPlanNodeId().getId(),
- source.getIp(),
- IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
- remoteInstanceId.toThrift());
- return new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
- } catch (IOException e) {
- throw new RuntimeException("Error happened while creating source handle", e);
- }
+ ISourceHandle sourceHandle =
+ DATA_BLOCK_MANAGER.createSourceHandle(
+ localInstanceId.toThrift(),
+ node.getPlanNodeId().getId(),
+ new Endpoint(
+ source.getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+ remoteInstanceId.toThrift());
+ return new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
}
@Override
@@ -321,19 +317,17 @@ public class LocalExecutionPlanner {
Endpoint target = node.getDownStreamEndpoint();
FragmentInstanceId localInstanceId = context.instanceContext.getId();
FragmentInstanceId targetInstanceId = node.getDownStreamInstanceId();
- try {
- ISinkHandle sinkHandle =
- DATA_BLOCK_MANAGER.createSinkHandle(
- localInstanceId.toThrift(),
- target.getIp(),
- IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
- targetInstanceId.toThrift(),
- node.getDownStreamPlanNodeId().getId());
- context.setSinkHandle(sinkHandle);
- return child;
- } catch (IOException e) {
- throw new RuntimeException("Error happened while creating sink handle", e);
- }
+ ISinkHandle sinkHandle =
+ DATA_BLOCK_MANAGER.createSinkHandle(
+ localInstanceId.toThrift(),
+ new Endpoint(
+ target.getIp(),
+ IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+ targetInstanceId.toThrift(),
+ node.getDownStreamPlanNodeId().getId(),
+ context.instanceContext);
+ context.setSinkHandle(sinkHandle);
+ return child;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java
index 44513c40d1..6c52ac4066 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java
@@ -66,7 +66,7 @@ public class DataNodeManagementServer extends ThriftService
new ThriftServiceThread(
processor,
getID().getName(),
- ThreadName.DATA_NODE_MANAGEMENT_CLIENT.getName(),
+ ThreadName.DATA_NODE_MANAGEMENT_RPC_CLIENT.getName(),
getBindIP(),
getBindPort(),
config.getRpcMaxConcurrentClientNum(),
@@ -76,7 +76,7 @@ public class DataNodeManagementServer extends ThriftService
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
- thriftServiceThread.setName(ThreadName.RPC_SERVICE.getName());
+ thriftServiceThread.setName(ThreadName.DATA_NODE_MANAGEMENT_RPC_SERVER.getName());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java b/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
index 4414fb9110..5236d2bf87 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
@@ -61,7 +61,7 @@ public class InternalService extends ThriftService implements InternalServiceMBe
new ThriftServiceThread(
processor,
getID().getName(),
- ThreadName.INTERNAL_SERVICE_CLIENT.getName(),
+ ThreadName.INTERNAL_SERVICE_RPC_CLIENT.getName(),
getBindIP(),
getBindPort(),
config.getRpcMaxConcurrentClientNum(),
@@ -72,7 +72,7 @@ public class InternalService extends ThriftService implements InternalServiceMBe
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
- thriftServiceThread.setName(ThreadName.INTERNAL_SERVICE_CLIENT.getName());
+ thriftServiceThread.setName(ThreadName.INTERNAL_SERVICE_RPC_SERVER.getName());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index 34634d74cc..c3c398bbf5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -88,7 +88,11 @@ public class InternalServiceImpl implements InternalService.Iface {
@Override
public TCancelResp cancelQuery(TCancelQueryReq req) throws TException {
- throw new NotImplementedException();
+
+ // TODO need to be implemented and currently in order not to print NotImplementedException log,
+ // we simply return null
+ return null;
+ // throw new NotImplementedException();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 4b86aca612..2c13c68ad7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -233,7 +233,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
SCHEMA_FETCHER);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new RuntimeException("");
+ throw new RuntimeException("error coed: " + result.status);
}
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(id);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index cf299aaa1f..2703d0aa37 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -158,7 +158,7 @@ public class DataDriverTest {
dataRegion,
ImmutableList.of(seriesScanOperator1, seriesScanOperator2));
- StubSinkHandle sinkHandle = new StubSinkHandle();
+ StubSinkHandle sinkHandle = new StubSinkHandle(fragmentInstanceContext);
try (Driver dataDriver = new DataDriver(limitOperator, sinkHandle, driverContext)) {
assertEquals(fragmentInstanceContext.getId(), dataDriver.getInfo());
@@ -171,7 +171,7 @@ public class DataDriverTest {
assertTrue(blocked.isDone());
}
- assertEquals(FragmentInstanceState.FINISHED, state.get());
+ assertEquals(FragmentInstanceState.FLUSHING, state.get());
List<TsBlock> result = sinkHandle.getTsBlocks();
assertEquals(13, result.size());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
index 32e474fd03..e73529aa06 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.utils.stats.CpuTimer;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import io.airlift.units.Duration;
-import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -305,7 +304,6 @@ public class DefaultTaskSchedulerTest {
IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
manager.setBlockManager(mockDataBlockManager);
InternalService.Client mockMppServiceClient = Mockito.mock(InternalService.Client.class);
- manager.setMppServiceClient(mockMppServiceClient);
ITaskScheduler defaultScheduler = manager.getScheduler();
QueryId queryId = new QueryId("test");
FragmentInstanceId instanceId1 =
@@ -364,12 +362,6 @@ public class DefaultTaskSchedulerTest {
manager.getTimeoutQueue().push(testTask1);
defaultScheduler.toAborted(testTask1);
- try {
- Mockito.verify(mockMppServiceClient, Mockito.times(1)).cancelQuery(Mockito.any());
- } catch (TException e) {
- e.printStackTrace();
- Assert.fail();
- }
Mockito.reset(mockMppServiceClient);
Mockito.verify(mockDataBlockManager, Mockito.times(2))
.forceDeregisterFragmentInstance(Mockito.any());