You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ne...@apache.org on 2021/03/03 04:06:03 UTC

[iotdb] branch master updated: [IOTDB-1164]Optimize the executeBatch interface in JDBC (#2725)

This is an automated email from the ASF dual-hosted git repository.

neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9135547  [IOTDB-1164]Optimize the executeBatch interface in JDBC (#2725)
9135547 is described below

commit 913554702601b94b4b8157586e4b3863d147e62f
Author: yanhong wang <67...@users.noreply.github.com>
AuthorDate: Wed Mar 3 12:05:34 2021 +0800

    [IOTDB-1164]Optimize the executeBatch interface in JDBC (#2725)
---
 .../apache/iotdb/db/cost/statistic/Operation.java  |   1 +
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 120 ++++++++++++---------
 .../iotdb/db/integration/IoTDBExecuteBatchIT.java  |  79 ++++++++++++++
 3 files changed, 152 insertions(+), 48 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/cost/statistic/Operation.java b/server/src/main/java/org/apache/iotdb/db/cost/statistic/Operation.java
index 3a12b5a..c18a827 100644
--- a/server/src/main/java/org/apache/iotdb/db/cost/statistic/Operation.java
+++ b/server/src/main/java/org/apache/iotdb/db/cost/statistic/Operation.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.cost.statistic;
 public enum Operation {
   EXECUTE_JDBC_BATCH("EXECUTE_JDBC_BATCH"),
   EXECUTE_ONE_SQL_IN_BATCH("EXECUTE_ONE_SQL_IN_BATCH"),
+  EXECUTE_ROWS_PLAN_IN_BATCH("EXECUTE_ROWS_PLAN_IN_BATCH"),
   EXECUTE_RPC_BATCH_INSERT("EXECUTE_RPC_BATCH_INSERT"),
   EXECUTE_QUERY("EXECUTE_QUERY");
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 952a86c..ad60bfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -430,61 +430,85 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return IoTDB.metaManager.getAllTimeseriesPath(path);
   }
 
-  @Override
-  public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
+  private boolean executeInsertRowsPlan(InsertRowsPlan insertRowsPlan, List<TSStatus> result) {
     long t1 = System.currentTimeMillis();
-    try {
-      if (!checkLogin(req.getSessionId())) {
-        return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
-      }
-
-      List<TSStatus> result = new ArrayList<>();
-      boolean isAllSuccessful = true;
-      for (String statement : req.getStatements()) {
-        long t2 = System.currentTimeMillis();
-        isAllSuccessful =
-            executeStatementInBatch(statement, result, req.getSessionId()) && isAllSuccessful;
-        Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
+    TSStatus tsStatus = executeNonQueryPlan(insertRowsPlan);
+    Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ROWS_PLAN_IN_BATCH, t1);
+    int startIndex = result.size();
+    if (startIndex > 0) {
+      startIndex = startIndex - 1;
+    }
+    for (int i = 0; i < insertRowsPlan.getRowCount(); i++) {
+      result.add(RpcUtils.SUCCESS_STATUS);
+    }
+    if (tsStatus.subStatus != null) {
+      for (Entry<Integer, TSStatus> entry : insertRowsPlan.getResults().entrySet()) {
+        result.set(startIndex + entry.getKey(), entry.getValue());
       }
-      return isAllSuccessful
-          ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute batch statements successfully")
-          : RpcUtils.getStatus(result);
-    } catch (Exception e) {
-      return onNPEOrUnexpectedException(
-          e, "executing executeBatchStatement", TSStatusCode.INTERNAL_SERVER_ERROR);
-    } finally {
-      Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_JDBC_BATCH, t1);
     }
+    return tsStatus.equals(RpcUtils.SUCCESS_STATUS);
   }
 
-  // execute one statement of a batch. Currently, query is not allowed in a batch statement and
-  // on finding queries in a batch, such query will be ignored and an error will be generated
-  private boolean executeStatementInBatch(String statement, List<TSStatus> result, long sessionId) {
-    try {
-      PhysicalPlan physicalPlan =
-          processor.parseSQLToPhysicalPlan(
-              statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE);
-      if (physicalPlan.isQuery()) {
-        throw new QueryInBatchStatementException(statement);
-      }
-      TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan, sessionId);
-      if (resp.getStatus().code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        result.add(resp.status);
-      } else {
-        result.add(resp.status);
-        return false;
-      }
-    } catch (Exception e) {
-      TSStatus status = tryCatchQueryException(e);
-      if (status != null) {
-        result.add(status);
-        return false;
+  @Override
+  public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
+    long t1 = System.currentTimeMillis();
+    List<TSStatus> result = new ArrayList<>();
+    boolean isAllSuccessful = true;
+    if (!checkLogin(req.getSessionId())) {
+      return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
+    }
+
+    InsertRowsPlan insertRowsPlan = new InsertRowsPlan();
+    int index = 0;
+    for (int i = 0; i < req.getStatements().size(); i++) {
+      String statement = req.getStatements().get(i);
+      try {
+        PhysicalPlan physicalPlan =
+            processor.parseSQLToPhysicalPlan(
+                statement, sessionIdZoneIdMap.get(req.getSessionId()), DEFAULT_FETCH_SIZE);
+        if (physicalPlan.isQuery()) {
+          throw new QueryInBatchStatementException(statement);
+        }
+
+        if (physicalPlan.getOperatorType().equals(OperatorType.INSERT)) {
+          insertRowsPlan.addOneInsertRowPlan((InsertRowPlan) physicalPlan, index);
+          index++;
+          if (i == req.getStatements().size() - 1
+              && !executeInsertRowsPlan(insertRowsPlan, result)) {
+            isAllSuccessful = false;
+          }
+        } else {
+          if (insertRowsPlan.getRowCount() > 0) {
+            if (!executeInsertRowsPlan(insertRowsPlan, result)) {
+              isAllSuccessful = false;
+            }
+            index = 0;
+            insertRowsPlan = new InsertRowsPlan();
+          }
+          long t2 = System.currentTimeMillis();
+          TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan, req.getSessionId());
+          Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
+          result.add(resp.status);
+          if (resp.getStatus().code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            isAllSuccessful = false;
+          }
+        }
+      } catch (Exception e) {
+        TSStatus status = tryCatchQueryException(e);
+        if (status != null) {
+          result.add(status);
+          isAllSuccessful = false;
+        } else {
+          result.add(
+              onNPEOrUnexpectedException(
+                  e, "executing " + statement, TSStatusCode.INTERNAL_SERVER_ERROR));
+        }
       }
-      result.add(
-          onNPEOrUnexpectedException(
-              e, "executing " + statement, TSStatusCode.INTERNAL_SERVER_ERROR));
     }
-    return true;
+    Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_JDBC_BATCH, t1);
+    return isAllSuccessful
+        ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute batch statements successfully")
+        : RpcUtils.getStatus(result);
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBExecuteBatchIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBExecuteBatchIT.java
new file mode 100644
index 0000000..410c95c
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBExecuteBatchIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+
+public class IoTDBExecuteBatchIT {
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testJDBCExecuteBatch() throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.setFetchSize(5);
+      statement.addBatch(
+          "insert into root.ln.wf01.wt01(timestamp,temperature) values(1509465600000,1.2)");
+      statement.addBatch(
+          "insert into root.ln.wf01.wt01(timestamp,temperature) values(1509465600001,2.3)");
+      statement.addBatch("delete timeseries root.ln.wf01.wt01");
+      statement.addBatch(
+          "insert into root.ln.wf01.wt01(timestamp,temperature) values(1509465600002,3.4)");
+      statement.executeBatch();
+      ResultSet resultSet = statement.executeQuery("select * from root.ln.wf01.wt01");
+      int count = 0;
+
+      String[] timestamps = {"1509465600002"};
+      String[] values = {"3.4"};
+
+      while (resultSet.next()) {
+        assertEquals(timestamps[count], resultSet.getString("Time"));
+        assertEquals(values[count], resultSet.getString("root.ln.wf01.wt01.temperature"));
+        count++;
+      }
+    } catch (SQLException e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+}