You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/16 13:19:08 UTC

[iotdb] branch RowDataQueryAPI updated: Support row data query for new cluster

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch RowDataQueryAPI
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/RowDataQueryAPI by this push:
     new 02f90fd3ff Support row data query for new cluster
02f90fd3ff is described below

commit 02f90fd3ff8b681f786beaa3485ef324cbcdd00c
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon May 16 21:18:54 2022 +0800

    Support row data query for new cluster
---
 .../thrift/impl/DataNodeTSIServiceImpl.java        | 53 +++++++++++++++++++++-
 1 file changed, 52 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index c86f88b097..9e6903744b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -948,7 +948,58 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
 
   @Override
   public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
-    throw new UnsupportedOperationException();
+    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+    long startTime = System.currentTimeMillis();
+    try {
+      Statement s =
+          StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      QUERY_FREQUENCY_RECORDER.incrementAndGet();
+      AUDIT_LOGGER.debug("Session {} execute row data Query", req.sessionId);
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      QueryId id = new QueryId(String.valueOf(queryId));
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              id,
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("error code: " + result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(id);
+
+      TSExecuteStatementResp resp;
+      if (queryExecution.isQuery()) {
+        resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+        resp.setStatus(result.status);
+        resp.setQueryDataSet(
+            QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
+      } else {
+        resp = RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      return resp;
+    } catch (Exception e) {
+      // TODO call the coordinator to release query resource
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, OperationType.EXECUTE_RAW_DATA_QUERY));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+    }
   }
 
   @Override