You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/21 12:32:38 UTC

[iotdb] 01/02: add CQState in import-check

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 02b760d0ac429b76def0b609bcca48d995047786
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Oct 21 18:09:56 2022 +0800

    add CQState in import-check
---
 integration-test/import-control.xml                |   1 +
 .../impl/DataNodeInternalRPCServiceImpl.java       | 165 ++++++++++-----------
 2 files changed, 78 insertions(+), 88 deletions(-)

diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml
index c5c350e7a4..bd29bc42e2 100644
--- a/integration-test/import-control.xml
+++ b/integration-test/import-control.xml
@@ -57,6 +57,7 @@
     <allow class="org.apache.commons.codec.digest.DigestUtils" />
     <allow class="org.apache.iotdb.commons.trigger.service.TriggerExecutableManager" />
     <allow class="org.apache.iotdb.commons.trigger.TriggerInformation" />
+    <allow class="org.apache.iotdb.commons.cq.CQState" />
     <allow pkg="org\.apache\.iotdb\.common\.rpc\.thrift.*" regex="true" />
     <allow pkg="org\.apache\.iotdb\.confignode\.rpc\.thrift.*" regex="true" />
     <allow pkg="org\.apache\.iotdb\.commons\.client\.sync.*" regex="true" />
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index d014d1052a..777ee4e21d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
@@ -51,7 +50,6 @@ import org.apache.iotdb.db.auth.AuthorizerManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.OperationType;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -79,14 +77,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
-import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.plan.expression.binary.LogicAndExpression;
-import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
-import org.apache.iotdb.db.mpp.plan.expression.leaf.TimestampOperand;
-import org.apache.iotdb.db.mpp.plan.expression.ternary.BetweenExpression;
-import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -97,8 +88,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeS
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
-import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
-import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.service.DataNode;
 import org.apache.iotdb.db.service.RegionMigrateService;
@@ -167,8 +156,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.trigger.api.enums.FailureStrategy;
 import org.apache.iotdb.trigger.api.enums.TriggerEvent;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import com.google.common.collect.ImmutableList;
@@ -185,13 +172,10 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 
 public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface {
 
@@ -600,79 +584,84 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   @Override
   public TSStatus executeCQ(TExecuteCQ req) throws TException {
 
-    long sessionId =
-        SESSION_MANAGER.requestSessionId(req.cqId, req.zoneId, IoTDBConstant.ClientVersion.V_0_13);
-
-    try {
-      QueryStatement s =
-          (QueryStatement)
-              StatementGenerator.createStatement(
-                  req.queryBody, SESSION_MANAGER.getZoneId(sessionId));
-      if (s == null) {
-        return RpcUtils.getStatus(
-            TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported");
-      }
-
-      // 1. add time filter in where
-      Expression timeFilter =
-          new BetweenExpression(
-              new TimestampOperand(),
-              new ConstantOperand(TSDataType.INT64, String.valueOf(req.startTime)),
-              new ConstantOperand(TSDataType.INT64, String.valueOf(req.endTime)));
-      if (s.getWhereCondition() != null) {
-        s.getWhereCondition()
-            .setPredicate(new LogicAndExpression(timeFilter, s.getWhereCondition().getPredicate()));
-      } else {
-        s.setWhereCondition(new WhereCondition(timeFilter));
-      }
-
-      // 2. add time rage in group by time
-      if (s.getGroupByTimeComponent() != null) {
-        s.getGroupByTimeComponent().setStartTime(req.startTime);
-        s.getGroupByTimeComponent().setEndTime(req.endTime);
-      }
-
-      QUERY_FREQUENCY_RECORDER.incrementAndGet();
-
-      long queryId =
-          SESSION_MANAGER.requestQueryId(SESSION_MANAGER.requestStatementId(sessionId), true);
-      // create and cache dataset
-      ExecutionResult result =
-          COORDINATOR.execute(
-              s,
-              queryId,
-              SESSION_MANAGER.getSessionInfo(sessionId),
-              req.queryBody,
-              PARTITION_FETCHER,
-              SCHEMA_FETCHER,
-              req.getTimeout());
-
-      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-          && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
-        return result.status;
-      }
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
 
-      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
-
-      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
-        if (queryExecution != null) {
-          // consume up all the result
-          while (true) {
-            Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
-            if (!optionalTsBlock.isPresent()) {
-              break;
-            }
-          }
-        }
-        return result.status;
-      }
-    } catch (Exception e) {
-      // TODO call the coordinator to release query resource
-      return onQueryException(e, "\"" + req.queryBody + "\". " + OperationType.EXECUTE_STATEMENT);
-    } finally {
-      SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
-      SESSION_MANAGER.closeSession(sessionId);
-    }
+    //    long sessionId =
+    //        SESSION_MANAGER.requestSessionId(req.cqId, req.zoneId,
+    // IoTDBConstant.ClientVersion.V_0_13);
+    //
+    //    try {
+    //      QueryStatement s =
+    //          (QueryStatement)
+    //              StatementGenerator.createStatement(
+    //                  req.queryBody, SESSION_MANAGER.getZoneId(sessionId));
+    //      if (s == null) {
+    //        return RpcUtils.getStatus(
+    //            TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported");
+    //      }
+    //
+    //      // 1. add time filter in where
+    //      Expression timeFilter =
+    //          new BetweenExpression(
+    //              new TimestampOperand(),
+    //              new ConstantOperand(TSDataType.INT64, String.valueOf(req.startTime)),
+    //              new ConstantOperand(TSDataType.INT64, String.valueOf(req.endTime)));
+    //      if (s.getWhereCondition() != null) {
+    //        s.getWhereCondition()
+    //            .setPredicate(new LogicAndExpression(timeFilter,
+    // s.getWhereCondition().getPredicate()));
+    //      } else {
+    //        s.setWhereCondition(new WhereCondition(timeFilter));
+    //      }
+    //
+    //      // 2. add time rage in group by time
+    //      if (s.getGroupByTimeComponent() != null) {
+    //        s.getGroupByTimeComponent().setStartTime(req.startTime);
+    //        s.getGroupByTimeComponent().setEndTime(req.endTime);
+    //      }
+    //
+    //      QUERY_FREQUENCY_RECORDER.incrementAndGet();
+    //
+    //      long queryId =
+    //          SESSION_MANAGER.requestQueryId(SESSION_MANAGER.requestStatementId(sessionId), true);
+    //      // create and cache dataset
+    //      ExecutionResult result =
+    //          COORDINATOR.execute(
+    //              s,
+    //              queryId,
+    //              SESSION_MANAGER.getSessionInfo(sessionId),
+    //              req.queryBody,
+    //              PARTITION_FETCHER,
+    //              SCHEMA_FETCHER,
+    //              req.getTimeout());
+    //
+    //      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+    //          && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+    //        return result.status;
+    //      }
+    //
+    //      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+    //
+    //      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+    //        if (queryExecution != null) {
+    //          // consume up all the result
+    //          while (true) {
+    //            Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+    //            if (!optionalTsBlock.isPresent()) {
+    //              break;
+    //            }
+    //          }
+    //        }
+    //        return result.status;
+    //      }
+    //    } catch (Exception e) {
+    //      // TODO call the coordinator to release query resource
+    //      return onQueryException(e, "\"" + req.queryBody + "\". " +
+    // OperationType.EXECUTE_STATEMENT);
+    //    } finally {
+    //      SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
+    //      SESSION_MANAGER.closeSession(sessionId);
+    //    }
   }
 
   private void cleanupQueryExecution(Long queryId) {