You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/07/12 03:02:31 UTC
[iotdb] branch master updated: [IOTDB-3520] Support executeBatchStatement for MPP framework (#6628)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0eeabbca62 [IOTDB-3520] Support executeBatchStatement for MPP framework (#6628)
0eeabbca62 is described below
commit 0eeabbca62fe2c1828b682b2880bcada219c8035
Author: Leping Huang <18...@163.com>
AuthorDate: Tue Jul 12 11:02:25 2022 +0800
[IOTDB-3520] Support executeBatchStatement for MPP framework (#6628)
---
.../service/thrift/impl/ClientRPCServiceImpl.java | 54 +++++++++++++++++++++-
1 file changed, 53 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 844ffa66ad..e283146cd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -602,7 +602,59 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
- throw new UnsupportedOperationException();
+ long t1 = System.currentTimeMillis();
+ List<TSStatus> results = new ArrayList<>();
+ boolean isAllSuccessful = true;
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ for (int i = 0; i < req.getStatements().size(); i++) {
+ String statement = req.getStatements().get(i);
+ try {
+ Statement s =
+ StatementGenerator.createStatement(
+ statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+ if (s == null) {
+ return RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is not supported");
+ }
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+
+ QUERY_FREQUENCY_RECORDER.incrementAndGet();
+ AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, s);
+
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ long t2 = System.currentTimeMillis();
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.execute(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ statement,
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
+ addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
+ results.add(result.status);
+ } catch (Exception e) {
+ LOGGER.error("Error occurred when executing executeBatchStatement: ", e);
+ TSStatus status =
+ onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_BATCH_STATEMENT);
+ if (status.getCode() != TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
+ isAllSuccessful = false;
+ }
+ results.add(status);
+ }
+ }
+ addOperationLatency(Operation.EXECUTE_JDBC_BATCH, t1);
+ return isAllSuccessful
+ ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute batch statements successfully")
+ : RpcUtils.getStatus(results);
}
@Override