You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/17 05:33:00 UTC
[iotdb] branch master updated: [IOTDB-3201] Support RowDataQuery API for new cluster (#5922)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5032057a36 [IOTDB-3201] Support RowDataQuery API for new cluster (#5922)
5032057a36 is described below
commit 5032057a367d4b8acdb1d60f2721d7af69e703d1
Author: Haonan <hh...@outlook.com>
AuthorDate: Tue May 17 13:32:54 2022 +0800
[IOTDB-3201] Support RowDataQuery API for new cluster (#5922)
---
.../db/mpp/plan/parser/StatementGenerator.java | 39 ++++++---------
.../thrift/impl/DataNodeTSIServiceImpl.java | 57 +++++++++++++++++++++-
2 files changed, 71 insertions(+), 25 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index faeb37ef7b..c542815158 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.common.filter.BasicFunctionFilter;
-import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
import org.apache.iotdb.db.mpp.plan.constant.FilterConstant;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.component.FromComponent;
@@ -44,7 +43,12 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
import org.apache.iotdb.db.qp.sql.SqlLexer;
import org.apache.iotdb.db.qp.strategy.SQLParseError;
+import org.apache.iotdb.db.query.expression.binary.GreaterEqualExpression;
+import org.apache.iotdb.db.query.expression.binary.LessThanExpression;
+import org.apache.iotdb.db.query.expression.binary.LogicAndExpression;
+import org.apache.iotdb.db.query.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.query.expression.leaf.TimestampOperand;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
@@ -71,9 +75,7 @@ import org.antlr.v4.runtime.tree.ParseTree;
import java.time.ZoneId;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import static org.apache.iotdb.commons.conf.IoTDBConstant.TIME;
@@ -100,27 +102,16 @@ public class StatementGenerator {
selectComponent.addResultColumn(new ResultColumn(new TimeSeriesOperand(new PartialPath(""))));
// set query filter
- QueryFilter queryFilter = new QueryFilter(FilterConstant.FilterType.KW_AND);
- PartialPath timePath = new PartialPath(TIME);
- queryFilter.setSinglePath(timePath);
- Set<PartialPath> pathSet = new HashSet<>();
- pathSet.add(timePath);
- queryFilter.setIsSingle(true);
- queryFilter.setPathSet(pathSet);
-
- BasicFunctionFilter left =
- new BasicFunctionFilter(
- FilterConstant.FilterType.GREATERTHANOREQUALTO,
- timePath,
- Long.toString(rawDataQueryReq.getStartTime()));
- BasicFunctionFilter right =
- new BasicFunctionFilter(
- FilterConstant.FilterType.LESSTHAN,
- timePath,
- Long.toString(rawDataQueryReq.getEndTime()));
- queryFilter.addChildOperator(left);
- queryFilter.addChildOperator(right);
- // whereCondition.setQueryFilter(queryFilter);
+ GreaterEqualExpression leftPredicate =
+ new GreaterEqualExpression(
+ new TimestampOperand(),
+ new ConstantOperand(TSDataType.INT64, Long.toString(rawDataQueryReq.getStartTime())));
+ LessThanExpression rightPredicate =
+ new LessThanExpression(
+ new TimestampOperand(),
+ new ConstantOperand(TSDataType.INT64, Long.toString(rawDataQueryReq.getEndTime())));
+ LogicAndExpression predicate = new LogicAndExpression(leftPredicate, rightPredicate);
+ whereCondition.setPredicate(predicate);
queryStatement.setSelectComponent(selectComponent);
queryStatement.setFromComponent(fromComponent);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 10fe232ed8..4d403bef8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -949,7 +949,62 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
@Override
public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
- throw new UnsupportedOperationException();
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+ long startTime = System.currentTimeMillis();
+ try {
+ Statement s =
+ StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ QUERY_FREQUENCY_RECORDER.incrementAndGet();
+ AUDIT_LOGGER.debug("Session {} execute Row Data Query: {}", req.sessionId, req);
+ long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+ QueryId id = new QueryId(String.valueOf(queryId));
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.execute(
+ s,
+ id,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new RuntimeException("error code: " + result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(id);
+
+ TSExecuteStatementResp resp;
+ if (queryExecution.isQuery()) {
+ resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+ resp.setStatus(result.status);
+ resp.setQueryDataSet(
+ QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
+ } else {
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ return resp;
+ } catch (Exception e) {
+ // TODO call the coordinator to release query resource
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ long costTime = System.currentTimeMillis() - startTime;
+ if (costTime >= CONFIG.getSlowQueryThreshold()) {
+ SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
+ }
+ }
}
@Override