You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/13 13:44:03 UTC
[iotdb] branch IOTDB-4619 updated: Finish executing cq in datanode
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-4619 by this push:
new 168bd9ab7d Finish executing cq in datanode
168bd9ab7d is described below
commit 168bd9ab7d5758519c74795436f813fcfab3a1f1
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Oct 13 21:43:53 2022 +0800
Finish executing cq in datanode
---
.../confignode/manager/cq/CQScheduleTask.java | 9 +-
.../iotdb/confignode/persistence/cq/CQInfo.java | 13 ++
.../thrift/ConfigNodeRPCServiceProcessor.java | 6 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 132 ++++++++++++++++++++-
.../src/main/thrift/confignode.thrift | 13 +-
thrift/src/main/thrift/datanode.thrift | 3 +
6 files changed, 164 insertions(+), 12 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index 22765c622e..5c4d777157 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -52,6 +52,8 @@ public class CQScheduleTask implements Runnable {
private final TimeoutPolicy timeoutPolicy;
private final String queryBody;
private final String md5;
+
+ private final String zoneId;
private final ScheduledExecutorService executor;
private final ConfigManager configManager;
@@ -74,6 +76,7 @@ public class CQScheduleTask implements Runnable {
TimeoutPolicy.deserialize(req.timeoutPolicy),
req.queryBody,
md5,
+ req.zoneId,
executor,
configManager,
firstExecutionTime);
@@ -89,6 +92,7 @@ public class CQScheduleTask implements Runnable {
entry.getTimeoutPolicy(),
entry.getQueryBody(),
entry.getMd5(),
+ entry.getZoneId(),
executor,
configManager,
entry.getLastExecutionTime() + entry.getEveryInterval());
@@ -102,6 +106,7 @@ public class CQScheduleTask implements Runnable {
TimeoutPolicy timeoutPolicy,
String queryBody,
String md5,
+ String zoneId,
ScheduledExecutorService executor,
ConfigManager configManager,
long executionTime) {
@@ -112,6 +117,7 @@ public class CQScheduleTask implements Runnable {
this.timeoutPolicy = timeoutPolicy;
this.queryBody = queryBody;
this.md5 = md5;
+ this.zoneId = zoneId;
this.executor = executor;
this.configManager = configManager;
this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS, everyInterval);
@@ -142,7 +148,8 @@ public class CQScheduleTask implements Runnable {
submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
}
} else {
- TExecuteCQ executeCQReq = new TExecuteCQ(queryBody, startTime, endTime);
+ TExecuteCQ executeCQReq =
+ new TExecuteCQ(queryBody, startTime, endTime, everyInterval, zoneId, cqId);
try {
AsyncDataNodeInternalServiceClient client =
AsyncDataNodeClientPool.getInstance().getAsyncClient(targetDataNode.get());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
index d9e5937ee9..2b44de724b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
@@ -275,6 +275,8 @@ public class CQInfo implements SnapshotProcessor {
private final String sql;
private final String md5;
+ private final String zoneId;
+
private CQState state;
private long lastExecutionTime;
@@ -289,6 +291,7 @@ public class CQInfo implements SnapshotProcessor {
req.queryBody,
req.sql,
md5,
+ req.zoneId,
CQState.INACTIVE,
lastExecutionTime);
}
@@ -304,6 +307,7 @@ public class CQInfo implements SnapshotProcessor {
other.queryBody,
other.sql,
other.md5,
+ other.zoneId,
other.state,
other.lastExecutionTime);
}
@@ -318,6 +322,7 @@ public class CQInfo implements SnapshotProcessor {
String queryBody,
String sql,
String md5,
+ String zoneId,
CQState state,
long lastExecutionTime) {
this.cqId = cqId;
@@ -329,6 +334,7 @@ public class CQInfo implements SnapshotProcessor {
this.queryBody = queryBody;
this.sql = sql;
this.md5 = md5;
+ this.zoneId = zoneId;
this.state = state;
this.lastExecutionTime = lastExecutionTime;
}
@@ -343,6 +349,7 @@ public class CQInfo implements SnapshotProcessor {
ReadWriteIOUtils.write(queryBody, stream);
ReadWriteIOUtils.write(sql, stream);
ReadWriteIOUtils.write(md5, stream);
+ ReadWriteIOUtils.write(zoneId, stream);
ReadWriteIOUtils.write(state.getType(), stream);
ReadWriteIOUtils.write(lastExecutionTime, stream);
}
@@ -357,6 +364,7 @@ public class CQInfo implements SnapshotProcessor {
String queryBody = ReadWriteIOUtils.readString(stream);
String sql = ReadWriteIOUtils.readString(stream);
String md5 = ReadWriteIOUtils.readString(stream);
+ String zoneId = ReadWriteIOUtils.readString(stream);
CQState state = CQState.deserialize(ReadWriteIOUtils.readByte(stream));
long lastExecutionTime = ReadWriteIOUtils.readLong(stream);
return new CQEntry(
@@ -369,6 +377,7 @@ public class CQInfo implements SnapshotProcessor {
queryBody,
sql,
md5,
+ zoneId,
state,
lastExecutionTime);
}
@@ -416,5 +425,9 @@ public class CQInfo implements SnapshotProcessor {
public long getLastExecutionTime() {
return lastExecutionTime;
}
+
+ public String getZoneId() {
+ return zoneId;
+ }
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 15bee7ec96..808d59e7e9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -679,17 +679,17 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus createCQ(TCreateCQReq req) throws TException {
+ public TSStatus createCQ(TCreateCQReq req) {
return configManager.createCQ(req);
}
@Override
- public TSStatus dropCQ(TDropCQReq req) throws TException {
+ public TSStatus dropCQ(TDropCQReq req) {
return configManager.dropCQ(req);
}
@Override
- public TShowCQResp showCQ() throws TException {
+ public TShowCQResp showCQ() {
return configManager.showCQ();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 969d5bd5fb..b837c4f87a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
@@ -48,7 +49,9 @@ import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -69,7 +72,21 @@ import org.apache.iotdb.db.mpp.execution.executor.RegionWriteExecutor;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.binary.LogicAndExpression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimestampOperand;
+import org.apache.iotdb.db.mpp.plan.expression.ternary.BetweenExpression;
+import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -80,6 +97,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeS
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
+import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.service.DataNode;
import org.apache.iotdb.db.service.RegionMigrateService;
import org.apache.iotdb.db.service.metrics.MetricService;
@@ -140,6 +159,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.collect.ImmutableList;
import org.apache.thrift.TException;
@@ -155,15 +176,29 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface {
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeInternalRPCServiceImpl.class);
+
+ private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+ private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private final IPartitionFetcher PARTITION_FETCHER;
+
+ private final ISchemaFetcher SCHEMA_FETCHER;
+
private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
@@ -171,6 +206,13 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
public DataNodeInternalRPCServiceImpl() {
super();
+ if (config.isClusterMode()) {
+ PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+ SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+ } else {
+ PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+ SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+ }
}
@Override
@@ -548,8 +590,94 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
@Override
public TSStatus executeCQ(TExecuteCQ req) throws TException {
- // TODO (tian yuan) cq execution on DataNode
- return null;
+
+ long sessionId =
+ SESSION_MANAGER.requestSessionId(req.cqId, req.zoneId, IoTDBConstant.ClientVersion.V_0_13);
+
+ try {
+ QueryStatement s =
+ (QueryStatement)
+ StatementGenerator.createStatement(
+ req.queryBody, SESSION_MANAGER.getZoneId(sessionId));
+ if (s == null) {
+ return RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported");
+ }
+
+ // 1. add time filter in where
+ if (s.getWhereCondition() != null) {
+ Expression predicate = s.getWhereCondition().getPredicate();
+ Expression timeFilter =
+ new BetweenExpression(
+ new TimestampOperand(),
+ new ConstantOperand(TSDataType.INT64, String.valueOf(req.startTime)),
+ new ConstantOperand(TSDataType.INT64, String.valueOf(req.endTime)));
+ if (predicate == null) {
+ s.getWhereCondition().setPredicate(timeFilter);
+ } else {
+ s.getWhereCondition()
+ .setPredicate(
+ new LogicAndExpression(timeFilter, s.getWhereCondition().getPredicate()));
+ }
+ }
+
+ // 2. add time rage in group by time
+ if (s.getGroupByTimeComponent() != null) {
+ s.getGroupByTimeComponent().setStartTime(req.startTime);
+ s.getGroupByTimeComponent().setEndTime(req.endTime);
+ }
+
+ QUERY_FREQUENCY_RECORDER.incrementAndGet();
+
+ long queryId = SESSION_MANAGER.requestQueryId(0L, true);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.execute(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(sessionId),
+ req.queryBody,
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+ return result.status;
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+ if (queryExecution != null) {
+ // consume up all the result
+ while (true) {
+ Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+ if (!optionalTsBlock.isPresent()) {
+ break;
+ }
+ }
+ }
+ return result.status;
+ }
+ } catch (Exception e) {
+ // TODO call the coordinator to release query resource
+ return onQueryException(e, "\"" + req.queryBody + "\". " + OperationType.EXECUTE_STATEMENT);
+ } finally {
+ SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
+ SESSION_MANAGER.closeSession(sessionId);
+ }
+ }
+
+ private void cleanupQueryExecution(Long queryId) {
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+ if (queryExecution != null) {
+ try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) {
+ LOGGER.info("[CleanUpQuery]]");
+ queryExecution.stopAndCleanup();
+ COORDINATOR.removeQueryExecution(queryId);
+ }
+ }
}
private PathPatternTree filterPathPatternTree(PathPatternTree patternTree, String storageGroup) {
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 3e1438f5f6..fac5941545 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -496,13 +496,14 @@ struct TDeleteTimeSeriesReq{
// ====================================================
struct TCreateCQReq {
1: required string cqId,
- 2: required i64 everyInterval,
- 3: required i64 boundaryTime,
- 4: required i64 startTimeOffset,
- 5: required i64 endTimeOffset,
- 6: required byte timeoutPolicy,
- 7: required string queryBody,
+ 2: required i64 everyInterval
+ 3: required i64 boundaryTime
+ 4: required i64 startTimeOffset
+ 5: required i64 endTimeOffset
+ 6: required byte timeoutPolicy
+ 7: required string queryBody
8: required string sql
+ 9: required string zoneId
}
struct TDropCQReq {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 13746abc9b..d9dfa6c12d 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -297,6 +297,9 @@ struct TExecuteCQ {
1: required string queryBody
2: required i64 startTime
3: required i64 endTime
+ 4: required i64 timeout
+ 5: required string zoneId
+ 6: required string cqId
}
service IDataNodeRPCService {