You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/13 13:44:03 UTC

[iotdb] branch IOTDB-4619 updated: Finish executing cq in datanode

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-4619 by this push:
     new 168bd9ab7d Finish executing cq in datanode
168bd9ab7d is described below

commit 168bd9ab7d5758519c74795436f813fcfab3a1f1
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Oct 13 21:43:53 2022 +0800

    Finish executing cq in datanode
---
 .../confignode/manager/cq/CQScheduleTask.java      |   9 +-
 .../iotdb/confignode/persistence/cq/CQInfo.java    |  13 ++
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   6 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       | 132 ++++++++++++++++++++-
 .../src/main/thrift/confignode.thrift              |  13 +-
 thrift/src/main/thrift/datanode.thrift             |   3 +
 6 files changed, 164 insertions(+), 12 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index 22765c622e..5c4d777157 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -52,6 +52,8 @@ public class CQScheduleTask implements Runnable {
   private final TimeoutPolicy timeoutPolicy;
   private final String queryBody;
   private final String md5;
+
+  private final String zoneId;
   private final ScheduledExecutorService executor;
 
   private final ConfigManager configManager;
@@ -74,6 +76,7 @@ public class CQScheduleTask implements Runnable {
         TimeoutPolicy.deserialize(req.timeoutPolicy),
         req.queryBody,
         md5,
+        req.zoneId,
         executor,
         configManager,
         firstExecutionTime);
@@ -89,6 +92,7 @@ public class CQScheduleTask implements Runnable {
         entry.getTimeoutPolicy(),
         entry.getQueryBody(),
         entry.getMd5(),
+        entry.getZoneId(),
         executor,
         configManager,
         entry.getLastExecutionTime() + entry.getEveryInterval());
@@ -102,6 +106,7 @@ public class CQScheduleTask implements Runnable {
       TimeoutPolicy timeoutPolicy,
       String queryBody,
       String md5,
+      String zoneId,
       ScheduledExecutorService executor,
       ConfigManager configManager,
       long executionTime) {
@@ -112,6 +117,7 @@ public class CQScheduleTask implements Runnable {
     this.timeoutPolicy = timeoutPolicy;
     this.queryBody = queryBody;
     this.md5 = md5;
+    this.zoneId = zoneId;
     this.executor = executor;
     this.configManager = configManager;
     this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS, everyInterval);
@@ -142,7 +148,8 @@ public class CQScheduleTask implements Runnable {
         submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
       }
     } else {
-      TExecuteCQ executeCQReq = new TExecuteCQ(queryBody, startTime, endTime);
+      TExecuteCQ executeCQReq =
+          new TExecuteCQ(queryBody, startTime, endTime, everyInterval, zoneId, cqId);
       try {
         AsyncDataNodeInternalServiceClient client =
             AsyncDataNodeClientPool.getInstance().getAsyncClient(targetDataNode.get());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
index d9e5937ee9..2b44de724b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
@@ -275,6 +275,8 @@ public class CQInfo implements SnapshotProcessor {
     private final String sql;
     private final String md5;
 
+    private final String zoneId;
+
     private CQState state;
     private long lastExecutionTime;
 
@@ -289,6 +291,7 @@ public class CQInfo implements SnapshotProcessor {
           req.queryBody,
           req.sql,
           md5,
+          req.zoneId,
           CQState.INACTIVE,
           lastExecutionTime);
     }
@@ -304,6 +307,7 @@ public class CQInfo implements SnapshotProcessor {
           other.queryBody,
           other.sql,
           other.md5,
+          other.zoneId,
           other.state,
           other.lastExecutionTime);
     }
@@ -318,6 +322,7 @@ public class CQInfo implements SnapshotProcessor {
         String queryBody,
         String sql,
         String md5,
+        String zoneId,
         CQState state,
         long lastExecutionTime) {
       this.cqId = cqId;
@@ -329,6 +334,7 @@ public class CQInfo implements SnapshotProcessor {
       this.queryBody = queryBody;
       this.sql = sql;
       this.md5 = md5;
+      this.zoneId = zoneId;
       this.state = state;
       this.lastExecutionTime = lastExecutionTime;
     }
@@ -343,6 +349,7 @@ public class CQInfo implements SnapshotProcessor {
       ReadWriteIOUtils.write(queryBody, stream);
       ReadWriteIOUtils.write(sql, stream);
       ReadWriteIOUtils.write(md5, stream);
+      ReadWriteIOUtils.write(zoneId, stream);
       ReadWriteIOUtils.write(state.getType(), stream);
       ReadWriteIOUtils.write(lastExecutionTime, stream);
     }
@@ -357,6 +364,7 @@ public class CQInfo implements SnapshotProcessor {
       String queryBody = ReadWriteIOUtils.readString(stream);
       String sql = ReadWriteIOUtils.readString(stream);
       String md5 = ReadWriteIOUtils.readString(stream);
+      String zoneId = ReadWriteIOUtils.readString(stream);
       CQState state = CQState.deserialize(ReadWriteIOUtils.readByte(stream));
       long lastExecutionTime = ReadWriteIOUtils.readLong(stream);
       return new CQEntry(
@@ -369,6 +377,7 @@ public class CQInfo implements SnapshotProcessor {
           queryBody,
           sql,
           md5,
+          zoneId,
           state,
           lastExecutionTime);
     }
@@ -416,5 +425,9 @@ public class CQInfo implements SnapshotProcessor {
     public long getLastExecutionTime() {
       return lastExecutionTime;
     }
+
+    public String getZoneId() {
+      return zoneId;
+    }
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 15bee7ec96..808d59e7e9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -679,17 +679,17 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
   }
 
   @Override
-  public TSStatus createCQ(TCreateCQReq req) throws TException {
+  public TSStatus createCQ(TCreateCQReq req) {
     return configManager.createCQ(req);
   }
 
   @Override
-  public TSStatus dropCQ(TDropCQReq req) throws TException {
+  public TSStatus dropCQ(TDropCQReq req) {
     return configManager.dropCQ(req);
   }
 
   @Override
-  public TShowCQResp showCQ() throws TException {
+  public TShowCQResp showCQ() {
     return configManager.showCQ();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 969d5bd5fb..b837c4f87a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
@@ -48,7 +49,9 @@ import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
 import org.apache.iotdb.db.auth.AuthorizerManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -69,7 +72,21 @@ import org.apache.iotdb.db.mpp.execution.executor.RegionWriteExecutor;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.binary.LogicAndExpression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimestampOperand;
+import org.apache.iotdb.db.mpp.plan.expression.ternary.BetweenExpression;
+import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -80,6 +97,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeS
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
+import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.service.DataNode;
 import org.apache.iotdb.db.service.RegionMigrateService;
 import org.apache.iotdb.db.service.metrics.MetricService;
@@ -140,6 +159,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.thrift.TException;
@@ -155,15 +176,29 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 
 public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(DataNodeInternalRPCServiceImpl.class);
+
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private final IPartitionFetcher PARTITION_FETCHER;
+
+  private final ISchemaFetcher SCHEMA_FETCHER;
+
   private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
   private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
 
@@ -171,6 +206,13 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
 
   public DataNodeInternalRPCServiceImpl() {
     super();
+    if (config.isClusterMode()) {
+      PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+      SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+    } else {
+      PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+      SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+    }
   }
 
   @Override
@@ -548,8 +590,94 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
 
   @Override
   public TSStatus executeCQ(TExecuteCQ req) throws TException {
-    // TODO (tian yuan) cq execution on DataNode
-    return null;
+
+    long sessionId =
+        SESSION_MANAGER.requestSessionId(req.cqId, req.zoneId, IoTDBConstant.ClientVersion.V_0_13);
+
+    try {
+      QueryStatement s =
+          (QueryStatement)
+              StatementGenerator.createStatement(
+                  req.queryBody, SESSION_MANAGER.getZoneId(sessionId));
+      if (s == null) {
+        return RpcUtils.getStatus(
+            TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported");
+      }
+
+      // 1. add time filter in where
+      if (s.getWhereCondition() != null) {
+        Expression predicate = s.getWhereCondition().getPredicate();
+        Expression timeFilter =
+            new BetweenExpression(
+                new TimestampOperand(),
+                new ConstantOperand(TSDataType.INT64, String.valueOf(req.startTime)),
+                new ConstantOperand(TSDataType.INT64, String.valueOf(req.endTime)));
+        if (predicate == null) {
+          s.getWhereCondition().setPredicate(timeFilter);
+        } else {
+          s.getWhereCondition()
+              .setPredicate(
+                  new LogicAndExpression(timeFilter, s.getWhereCondition().getPredicate()));
+        }
+      }
+
+      // 2. add time rage in group by time
+      if (s.getGroupByTimeComponent() != null) {
+        s.getGroupByTimeComponent().setStartTime(req.startTime);
+        s.getGroupByTimeComponent().setEndTime(req.endTime);
+      }
+
+      QUERY_FREQUENCY_RECORDER.incrementAndGet();
+
+      long queryId = SESSION_MANAGER.requestQueryId(0L, true);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(sessionId),
+              req.queryBody,
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+        return result.status;
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+        if (queryExecution != null) {
+          // consume up all the result
+          while (true) {
+            Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+            if (!optionalTsBlock.isPresent()) {
+              break;
+            }
+          }
+        }
+        return result.status;
+      }
+    } catch (Exception e) {
+      // TODO call the coordinator to release query resource
+      return onQueryException(e, "\"" + req.queryBody + "\". " + OperationType.EXECUTE_STATEMENT);
+    } finally {
+      SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
+      SESSION_MANAGER.closeSession(sessionId);
+    }
+  }
+
+  private void cleanupQueryExecution(Long queryId) {
+    IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+    if (queryExecution != null) {
+      try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) {
+        LOGGER.info("[CleanUpQuery]]");
+        queryExecution.stopAndCleanup();
+        COORDINATOR.removeQueryExecution(queryId);
+      }
+    }
   }
 
   private PathPatternTree filterPathPatternTree(PathPatternTree patternTree, String storageGroup) {
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 3e1438f5f6..fac5941545 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -496,13 +496,14 @@ struct TDeleteTimeSeriesReq{
 // ====================================================
 struct TCreateCQReq {
   1: required string cqId,
-  2: required i64 everyInterval,
-  3: required i64 boundaryTime,
-  4: required i64 startTimeOffset,
-  5: required i64 endTimeOffset,
-  6: required byte timeoutPolicy,
-  7: required string queryBody,
+  2: required i64 everyInterval
+  3: required i64 boundaryTime
+  4: required i64 startTimeOffset
+  5: required i64 endTimeOffset
+  6: required byte timeoutPolicy
+  7: required string queryBody
   8: required string sql
+  9: required string zoneId
 }
 
 struct TDropCQReq {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 13746abc9b..d9dfa6c12d 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -297,6 +297,9 @@ struct TExecuteCQ {
   1: required string queryBody
   2: required i64 startTime
   3: required i64 endTime
+  4: required i64 timeout
+  5: required string zoneId
+  6: required string cqId
 }
 
 service IDataNodeRPCService {