You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/17 05:33:00 UTC

[iotdb] branch master updated: [IOTDB-3201] Support RowDataQuery API for new cluster (#5922)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5032057a36 [IOTDB-3201] Support RowDataQuery API for new cluster (#5922)
5032057a36 is described below

commit 5032057a367d4b8acdb1d60f2721d7af69e703d1
Author: Haonan <hh...@outlook.com>
AuthorDate: Tue May 17 13:32:54 2022 +0800

    [IOTDB-3201] Support RowDataQuery API for new cluster (#5922)
---
 .../db/mpp/plan/parser/StatementGenerator.java     | 39 ++++++---------
 .../thrift/impl/DataNodeTSIServiceImpl.java        | 57 +++++++++++++++++++++-
 2 files changed, 71 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index faeb37ef7b..c542815158 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.common.filter.BasicFunctionFilter;
-import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
 import org.apache.iotdb.db.mpp.plan.constant.FilterConstant;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.component.FromComponent;
@@ -44,7 +43,12 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
 import org.apache.iotdb.db.qp.sql.SqlLexer;
 import org.apache.iotdb.db.qp.strategy.SQLParseError;
+import org.apache.iotdb.db.query.expression.binary.GreaterEqualExpression;
+import org.apache.iotdb.db.query.expression.binary.LessThanExpression;
+import org.apache.iotdb.db.query.expression.binary.LogicAndExpression;
+import org.apache.iotdb.db.query.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.query.expression.leaf.TimestampOperand;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
@@ -71,9 +75,7 @@ import org.antlr.v4.runtime.tree.ParseTree;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.TIME;
 
@@ -100,27 +102,16 @@ public class StatementGenerator {
     selectComponent.addResultColumn(new ResultColumn(new TimeSeriesOperand(new PartialPath(""))));
 
     // set query filter
-    QueryFilter queryFilter = new QueryFilter(FilterConstant.FilterType.KW_AND);
-    PartialPath timePath = new PartialPath(TIME);
-    queryFilter.setSinglePath(timePath);
-    Set<PartialPath> pathSet = new HashSet<>();
-    pathSet.add(timePath);
-    queryFilter.setIsSingle(true);
-    queryFilter.setPathSet(pathSet);
-
-    BasicFunctionFilter left =
-        new BasicFunctionFilter(
-            FilterConstant.FilterType.GREATERTHANOREQUALTO,
-            timePath,
-            Long.toString(rawDataQueryReq.getStartTime()));
-    BasicFunctionFilter right =
-        new BasicFunctionFilter(
-            FilterConstant.FilterType.LESSTHAN,
-            timePath,
-            Long.toString(rawDataQueryReq.getEndTime()));
-    queryFilter.addChildOperator(left);
-    queryFilter.addChildOperator(right);
-    //    whereCondition.setQueryFilter(queryFilter);
+    GreaterEqualExpression leftPredicate =
+        new GreaterEqualExpression(
+            new TimestampOperand(),
+            new ConstantOperand(TSDataType.INT64, Long.toString(rawDataQueryReq.getStartTime())));
+    LessThanExpression rightPredicate =
+        new LessThanExpression(
+            new TimestampOperand(),
+            new ConstantOperand(TSDataType.INT64, Long.toString(rawDataQueryReq.getEndTime())));
+    LogicAndExpression predicate = new LogicAndExpression(leftPredicate, rightPredicate);
+    whereCondition.setPredicate(predicate);
 
     queryStatement.setSelectComponent(selectComponent);
     queryStatement.setFromComponent(fromComponent);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 10fe232ed8..4d403bef8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -949,7 +949,62 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
 
   @Override
   public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
-    throw new UnsupportedOperationException();
+    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+    long startTime = System.currentTimeMillis();
+    try {
+      Statement s =
+          StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      QUERY_FREQUENCY_RECORDER.incrementAndGet();
+      AUDIT_LOGGER.debug("Session {} execute Row Data Query: {}", req.sessionId, req);
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      QueryId id = new QueryId(String.valueOf(queryId));
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              id,
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("error code: " + result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(id);
+
+      TSExecuteStatementResp resp;
+      if (queryExecution.isQuery()) {
+        resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+        resp.setStatus(result.status);
+        resp.setQueryDataSet(
+            QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
+      } else {
+        resp = RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      return resp;
+    } catch (Exception e) {
+      // TODO call the coordinator to release query resource
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      long costTime = System.currentTimeMillis() - startTime;
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
+        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
+      }
+    }
   }
 
   @Override