You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/29 09:21:14 UTC

[iotdb] 03/05: implement server

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/querySession
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f576b91588998c6c4e8370685e4d6b017da996e4
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Dec 29 16:06:31 2022 +0800

    implement server
---
 .../db/mpp/plan/parser/StatementGenerator.java     | 58 ++++++++++++++++++++
 .../statement/component/GroupByTimeComponent.java  |  2 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 62 ++++++++++++++++++++++
 3 files changed, 121 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 5a82d4541f..bf1b78d75d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.plan.parser;
 
+import org.apache.iotdb.common.rpc.thrift.TPartialPath;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -32,8 +33,10 @@ import org.apache.iotdb.db.mpp.plan.expression.binary.LogicAndExpression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimestampOperand;
+import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.component.FromComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.SelectComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
@@ -61,6 +64,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTempl
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
 import org.apache.iotdb.db.qp.sql.SqlLexer;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -97,6 +101,7 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -174,6 +179,59 @@ public class StatementGenerator {
     return lastQueryStatement;
   }
 
+  public static Statement createStatement(TSAggregationQueryReq req, ZoneId zoneId) {
+    QueryStatement queryStatement = new QueryStatement();
+
+    FromComponent fromComponent = new FromComponent();
+    fromComponent.addPrefixPath(new PartialPath("", false));
+    queryStatement.setFromComponent(fromComponent);
+
+    SelectComponent selectComponent = new SelectComponent(zoneId);
+    List<PartialPath> selectPaths = new ArrayList<>();
+    for (TPartialPath path : req.getPaths()) {
+      selectPaths.add(new PartialPath(path.getNodes().toArray(new String[0])));
+    }
+    List<String> aggregations = req.getAggregations();
+    for (int i = 0; i < aggregations.size(); i++) {
+      selectComponent.addResultColumn(
+          new ResultColumn(
+              new FunctionExpression(
+                  aggregations.get(i),
+                  new LinkedHashMap<>(),
+                  Collections.singletonList(new TimeSeriesOperand(selectPaths.get(i)))),
+              ResultColumn.ColumnType.AGGREGATION));
+    }
+    queryStatement.setSelectComponent(selectComponent);
+
+    if (req.isSetInterval()) {
+      GroupByTimeComponent groupByTimeComponent = new GroupByTimeComponent();
+      groupByTimeComponent.setStartTime(req.getStartTime());
+      groupByTimeComponent.setEndTime(req.getEndTime());
+      groupByTimeComponent.setInterval(req.getInterval());
+      if (req.isSetSlidingStep()) {
+        groupByTimeComponent.setSlidingStep(req.getSlidingStep());
+      } else {
+        groupByTimeComponent.setSlidingStep(req.getInterval());
+      }
+      queryStatement.setGroupByTimeComponent(groupByTimeComponent);
+    } else if (req.isSetStartTime()) {
+      WhereCondition whereCondition = new WhereCondition();
+      GreaterEqualExpression leftPredicate =
+          new GreaterEqualExpression(
+              new TimestampOperand(),
+              new ConstantOperand(TSDataType.INT64, Long.toString(req.getStartTime())));
+      LessThanExpression rightPredicate =
+          new LessThanExpression(
+              new TimestampOperand(),
+              new ConstantOperand(TSDataType.INT64, Long.toString(req.getEndTime())));
+      LogicAndExpression predicate = new LogicAndExpression(leftPredicate, rightPredicate);
+      whereCondition.setPredicate(predicate);
+      queryStatement.setWhereCondition(whereCondition);
+    }
+
+    return queryStatement;
+  }
+
   public static Statement createStatement(TSInsertRecordReq insertRecordReq)
       throws IllegalPathException, QueryProcessException {
     // construct insert statement
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTimeComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTimeComponent.java
index d17a544fbf..fc5d8a5fdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTimeComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTimeComponent.java
@@ -39,7 +39,7 @@ public class GroupByTimeComponent extends StatementNode {
   private boolean isSlidingStepByMonth = false;
 
   // if it is left close and right open interval
-  private boolean leftCRightO;
+  private boolean leftCRightO = true;
 
   public GroupByTimeComponent() {}
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 29d07a462d..4c843b1ae3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
@@ -388,6 +389,67 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     return executeLastDataQueryInternal(req, SELECT_RESULT);
   }
 
+  @Override
+  public TSExecuteStatementResp executeAggregationQuery(TSAggregationQueryReq req)
+      throws TException {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+    long startTime = System.currentTimeMillis();
+    try {
+      Statement s =
+          StatementGenerator.createStatement(req, SESSION_MANAGER.getCurrSession().getZoneId());
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("error code: " + result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          finished = SELECT_RESULT.apply(resp, queryExecution, req.fetchSize);
+          resp.setMoreData(!finished);
+        } else {
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+
+    } catch (Exception e) {
+      finished = true;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(queryId);
+      }
+    }
+  }
+
   @Override
   public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
     long startTime = System.currentTimeMillis();