You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/07/12 03:02:31 UTC

[iotdb] branch master updated: [IOTDB-3520] Support executeBatchStatement for MPP framework (#6628)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0eeabbca62 [IOTDB-3520] Support executeBatchStatement for MPP framework (#6628)
0eeabbca62 is described below

commit 0eeabbca62fe2c1828b682b2880bcada219c8035
Author: Leping Huang <18...@163.com>
AuthorDate: Tue Jul 12 11:02:25 2022 +0800

    [IOTDB-3520] Support executeBatchStatement for MPP framework (#6628)
---
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 54 +++++++++++++++++++++-
 1 file changed, 53 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 844ffa66ad..e283146cd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -602,7 +602,59 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
-    throw new UnsupportedOperationException();
+    long t1 = System.currentTimeMillis();
+    List<TSStatus> results = new ArrayList<>();
+    boolean isAllSuccessful = true;
+    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      return getNotLoggedInStatus();
+    }
+
+    for (int i = 0; i < req.getStatements().size(); i++) {
+      String statement = req.getStatements().get(i);
+      try {
+        Statement s =
+            StatementGenerator.createStatement(
+                statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+        if (s == null) {
+          return RpcUtils.getStatus(
+              TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is not supported");
+        }
+        // permission check
+        TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          return status;
+        }
+
+        QUERY_FREQUENCY_RECORDER.incrementAndGet();
+        AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, s);
+
+        long queryId = SESSION_MANAGER.requestQueryId(false);
+        long t2 = System.currentTimeMillis();
+        // create and cache dataset
+        ExecutionResult result =
+            COORDINATOR.execute(
+                s,
+                queryId,
+                SESSION_MANAGER.getSessionInfo(req.sessionId),
+                statement,
+                PARTITION_FETCHER,
+                SCHEMA_FETCHER);
+        addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
+        results.add(result.status);
+      } catch (Exception e) {
+        LOGGER.error("Error occurred when executing executeBatchStatement: ", e);
+        TSStatus status =
+            onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_BATCH_STATEMENT);
+        if (status.getCode() != TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
+          isAllSuccessful = false;
+        }
+        results.add(status);
+      }
+    }
+    addOperationLatency(Operation.EXECUTE_JDBC_BATCH, t1);
+    return isAllSuccessful
+        ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute batch statements successfully")
+        : RpcUtils.getStatus(results);
   }
 
   @Override