You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/29 09:21:14 UTC
[iotdb] 03/05: implement server
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/querySession
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f576b91588998c6c4e8370685e4d6b017da996e4
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Dec 29 16:06:31 2022 +0800
implement server
---
.../db/mpp/plan/parser/StatementGenerator.java | 58 ++++++++++++++++++++
.../statement/component/GroupByTimeComponent.java | 2 +-
.../service/thrift/impl/ClientRPCServiceImpl.java | 62 ++++++++++++++++++++++
3 files changed, 121 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 5a82d4541f..bf1b78d75d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.plan.parser;
+import org.apache.iotdb.common.rpc.thrift.TPartialPath;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -32,8 +33,10 @@ import org.apache.iotdb.db.mpp.plan.expression.binary.LogicAndExpression;
import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimestampOperand;
+import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.component.FromComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.SelectComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
@@ -61,6 +64,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTempl
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
import org.apache.iotdb.db.qp.sql.SqlLexer;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -97,6 +101,7 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -174,6 +179,59 @@ public class StatementGenerator {
return lastQueryStatement;
}
+ public static Statement createStatement(TSAggregationQueryReq req, ZoneId zoneId) {
+ QueryStatement queryStatement = new QueryStatement();
+
+ FromComponent fromComponent = new FromComponent();
+ fromComponent.addPrefixPath(new PartialPath("", false));
+ queryStatement.setFromComponent(fromComponent);
+
+ SelectComponent selectComponent = new SelectComponent(zoneId);
+ List<PartialPath> selectPaths = new ArrayList<>();
+ for (TPartialPath path : req.getPaths()) {
+ selectPaths.add(new PartialPath(path.getNodes().toArray(new String[0])));
+ }
+ List<String> aggregations = req.getAggregations();
+ for (int i = 0; i < aggregations.size(); i++) {
+ selectComponent.addResultColumn(
+ new ResultColumn(
+ new FunctionExpression(
+ aggregations.get(i),
+ new LinkedHashMap<>(),
+ Collections.singletonList(new TimeSeriesOperand(selectPaths.get(i)))),
+ ResultColumn.ColumnType.AGGREGATION));
+ }
+ queryStatement.setSelectComponent(selectComponent);
+
+ if (req.isSetInterval()) {
+ GroupByTimeComponent groupByTimeComponent = new GroupByTimeComponent();
+ groupByTimeComponent.setStartTime(req.getStartTime());
+ groupByTimeComponent.setEndTime(req.getEndTime());
+ groupByTimeComponent.setInterval(req.getInterval());
+ if (req.isSetSlidingStep()) {
+ groupByTimeComponent.setSlidingStep(req.getSlidingStep());
+ } else {
+ groupByTimeComponent.setSlidingStep(req.getInterval());
+ }
+ queryStatement.setGroupByTimeComponent(groupByTimeComponent);
+ } else if (req.isSetStartTime()) {
+ WhereCondition whereCondition = new WhereCondition();
+ GreaterEqualExpression leftPredicate =
+ new GreaterEqualExpression(
+ new TimestampOperand(),
+ new ConstantOperand(TSDataType.INT64, Long.toString(req.getStartTime())));
+ LessThanExpression rightPredicate =
+ new LessThanExpression(
+ new TimestampOperand(),
+ new ConstantOperand(TSDataType.INT64, Long.toString(req.getEndTime())));
+ LogicAndExpression predicate = new LogicAndExpression(leftPredicate, rightPredicate);
+ whereCondition.setPredicate(predicate);
+ queryStatement.setWhereCondition(whereCondition);
+ }
+
+ return queryStatement;
+ }
+
public static Statement createStatement(TSInsertRecordReq insertRecordReq)
throws IllegalPathException, QueryProcessException {
// construct insert statement
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTimeComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTimeComponent.java
index d17a544fbf..fc5d8a5fdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTimeComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTimeComponent.java
@@ -39,7 +39,7 @@ public class GroupByTimeComponent extends StatementNode {
private boolean isSlidingStepByMonth = false;
// if it is left close and right open interval
- private boolean leftCRightO;
+ private boolean leftCRightO = true;
public GroupByTimeComponent() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 29d07a462d..4c843b1ae3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
@@ -388,6 +389,67 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
return executeLastDataQueryInternal(req, SELECT_RESULT);
}
+ @Override
+ public TSExecuteStatementResp executeAggregationQuery(TSAggregationQueryReq req)
+ throws TException {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
+ if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+ long startTime = System.currentTimeMillis();
+ try {
+ Statement s =
+ StatementGenerator.createStatement(req, SESSION_MANAGER.getCurrSession().getZoneId());
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.execute(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new RuntimeException("error code: " + result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution.isQuery()) {
+ resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+ resp.setStatus(result.status);
+ finished = SELECT_RESULT.apply(resp, queryExecution, req.fetchSize);
+ resp.setMoreData(!finished);
+ } else {
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+ return resp;
+ }
+
+ } catch (Exception e) {
+ finished = true;
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(queryId);
+ }
+ }
+ }
+
@Override
public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
long startTime = System.currentTimeMillis();