You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/21 12:32:38 UTC
[iotdb] 01/02: add CQState in import-check
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 02b760d0ac429b76def0b609bcca48d995047786
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Oct 21 18:09:56 2022 +0800
add CQState in import-check
---
integration-test/import-control.xml | 1 +
.../impl/DataNodeInternalRPCServiceImpl.java | 165 ++++++++++-----------
2 files changed, 78 insertions(+), 88 deletions(-)
diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml
index c5c350e7a4..bd29bc42e2 100644
--- a/integration-test/import-control.xml
+++ b/integration-test/import-control.xml
@@ -57,6 +57,7 @@
<allow class="org.apache.commons.codec.digest.DigestUtils" />
<allow class="org.apache.iotdb.commons.trigger.service.TriggerExecutableManager" />
<allow class="org.apache.iotdb.commons.trigger.TriggerInformation" />
+ <allow class="org.apache.iotdb.commons.cq.CQState" />
<allow pkg="org\.apache\.iotdb\.common\.rpc\.thrift.*" regex="true" />
<allow pkg="org\.apache\.iotdb\.confignode\.rpc\.thrift.*" regex="true" />
<allow pkg="org\.apache\.iotdb\.commons\.client\.sync.*" regex="true" />
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index d014d1052a..777ee4e21d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
@@ -51,7 +50,6 @@ import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.OperationType;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -79,14 +77,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
-import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.plan.expression.binary.LogicAndExpression;
-import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
-import org.apache.iotdb.db.mpp.plan.expression.leaf.TimestampOperand;
-import org.apache.iotdb.db.mpp.plan.expression.ternary.BetweenExpression;
-import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -97,8 +88,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeS
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
-import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
-import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.service.DataNode;
import org.apache.iotdb.db.service.RegionMigrateService;
@@ -167,8 +156,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
import org.apache.iotdb.trigger.api.enums.TriggerEvent;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.write.record.Tablet;
import com.google.common.collect.ImmutableList;
@@ -185,13 +172,10 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface {
@@ -600,79 +584,84 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
@Override
public TSStatus executeCQ(TExecuteCQ req) throws TException {
- long sessionId =
- SESSION_MANAGER.requestSessionId(req.cqId, req.zoneId, IoTDBConstant.ClientVersion.V_0_13);
-
- try {
- QueryStatement s =
- (QueryStatement)
- StatementGenerator.createStatement(
- req.queryBody, SESSION_MANAGER.getZoneId(sessionId));
- if (s == null) {
- return RpcUtils.getStatus(
- TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported");
- }
-
- // 1. add time filter in where
- Expression timeFilter =
- new BetweenExpression(
- new TimestampOperand(),
- new ConstantOperand(TSDataType.INT64, String.valueOf(req.startTime)),
- new ConstantOperand(TSDataType.INT64, String.valueOf(req.endTime)));
- if (s.getWhereCondition() != null) {
- s.getWhereCondition()
- .setPredicate(new LogicAndExpression(timeFilter, s.getWhereCondition().getPredicate()));
- } else {
- s.setWhereCondition(new WhereCondition(timeFilter));
- }
-
- // 2. add time rage in group by time
- if (s.getGroupByTimeComponent() != null) {
- s.getGroupByTimeComponent().setStartTime(req.startTime);
- s.getGroupByTimeComponent().setEndTime(req.endTime);
- }
-
- QUERY_FREQUENCY_RECORDER.incrementAndGet();
-
- long queryId =
- SESSION_MANAGER.requestQueryId(SESSION_MANAGER.requestStatementId(sessionId), true);
- // create and cache dataset
- ExecutionResult result =
- COORDINATOR.execute(
- s,
- queryId,
- SESSION_MANAGER.getSessionInfo(sessionId),
- req.queryBody,
- PARTITION_FETCHER,
- SCHEMA_FETCHER,
- req.getTimeout());
-
- if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
- return result.status;
- }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
-
- try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
- if (queryExecution != null) {
- // consume up all the result
- while (true) {
- Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
- if (!optionalTsBlock.isPresent()) {
- break;
- }
- }
- }
- return result.status;
- }
- } catch (Exception e) {
- // TODO call the coordinator to release query resource
- return onQueryException(e, "\"" + req.queryBody + "\". " + OperationType.EXECUTE_STATEMENT);
- } finally {
- SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
- SESSION_MANAGER.closeSession(sessionId);
- }
+ // long sessionId =
+ // SESSION_MANAGER.requestSessionId(req.cqId, req.zoneId,
+ // IoTDBConstant.ClientVersion.V_0_13);
+ //
+ // try {
+ // QueryStatement s =
+ // (QueryStatement)
+ // StatementGenerator.createStatement(
+ // req.queryBody, SESSION_MANAGER.getZoneId(sessionId));
+ // if (s == null) {
+ // return RpcUtils.getStatus(
+ // TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported");
+ // }
+ //
+ // // 1. add time filter in where
+ // Expression timeFilter =
+ // new BetweenExpression(
+ // new TimestampOperand(),
+ // new ConstantOperand(TSDataType.INT64, String.valueOf(req.startTime)),
+ // new ConstantOperand(TSDataType.INT64, String.valueOf(req.endTime)));
+ // if (s.getWhereCondition() != null) {
+ // s.getWhereCondition()
+ // .setPredicate(new LogicAndExpression(timeFilter,
+ // s.getWhereCondition().getPredicate()));
+ // } else {
+ // s.setWhereCondition(new WhereCondition(timeFilter));
+ // }
+ //
+ // // 2. add time rage in group by time
+ // if (s.getGroupByTimeComponent() != null) {
+ // s.getGroupByTimeComponent().setStartTime(req.startTime);
+ // s.getGroupByTimeComponent().setEndTime(req.endTime);
+ // }
+ //
+ // QUERY_FREQUENCY_RECORDER.incrementAndGet();
+ //
+ // long queryId =
+ // SESSION_MANAGER.requestQueryId(SESSION_MANAGER.requestStatementId(sessionId), true);
+ // // create and cache dataset
+ // ExecutionResult result =
+ // COORDINATOR.execute(
+ // s,
+ // queryId,
+ // SESSION_MANAGER.getSessionInfo(sessionId),
+ // req.queryBody,
+ // PARTITION_FETCHER,
+ // SCHEMA_FETCHER,
+ // req.getTimeout());
+ //
+ // if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ // && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+ // return result.status;
+ // }
+ //
+ // IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+ //
+ // try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+ // if (queryExecution != null) {
+ // // consume up all the result
+ // while (true) {
+ // Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+ // if (!optionalTsBlock.isPresent()) {
+ // break;
+ // }
+ // }
+ // }
+ // return result.status;
+ // }
+ // } catch (Exception e) {
+ // // TODO call the coordinator to release query resource
+ // return onQueryException(e, "\"" + req.queryBody + "\". " +
+ // OperationType.EXECUTE_STATEMENT);
+ // } finally {
+ // SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
+ // SESSION_MANAGER.closeSession(sessionId);
+ // }
}
private void cleanupQueryExecution(Long queryId) {