You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/16 13:19:08 UTC
[iotdb] branch RowDataQueryAPI updated: Support row data query for new cluster
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch RowDataQueryAPI
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/RowDataQueryAPI by this push:
new 02f90fd3ff Support row data query for new cluster
02f90fd3ff is described below
commit 02f90fd3ff8b681f786beaa3485ef324cbcdd00c
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon May 16 21:18:54 2022 +0800
Support row data query for new cluster
---
.../thrift/impl/DataNodeTSIServiceImpl.java | 53 +++++++++++++++++++++-
1 file changed, 52 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index c86f88b097..9e6903744b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -948,7 +948,58 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
@Override
public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
- throw new UnsupportedOperationException();
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+ long startTime = System.currentTimeMillis();
+ try {
+ Statement s =
+ StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ QUERY_FREQUENCY_RECORDER.incrementAndGet();
+ AUDIT_LOGGER.debug("Session {} execute row data Query", req.sessionId);
+ long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+ QueryId id = new QueryId(String.valueOf(queryId));
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.execute(
+ s,
+ id,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new RuntimeException("error code: " + result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(id);
+
+ TSExecuteStatementResp resp;
+ if (queryExecution.isQuery()) {
+ resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+ resp.setStatus(result.status);
+ resp.setQueryDataSet(
+ QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
+ } else {
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ return resp;
+ } catch (Exception e) {
+ // TODO call the coordinator to release query resource
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, OperationType.EXECUTE_RAW_DATA_QUERY));
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ }
}
@Override