You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/05/24 11:56:47 UTC

[incubator-iotdb] 01/01: add batch

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch add_batch_insert
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 03052d17993c0c857162d7af32173fc1af4a0fec
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri May 24 19:56:34 2019 +0800

    add batch
---
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 103 ++++++++++++++-------
 1 file changed, 69 insertions(+), 34 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index b02f1c2..2bea855 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1,19 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied.  See the License for the specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.iotdb.db.service;
@@ -48,6 +44,7 @@ import org.apache.iotdb.db.qp.QueryProcessor;
 import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -84,6 +81,7 @@ import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.TException;
 import org.apache.thrift.server.ServerContext;
 import org.slf4j.Logger;
@@ -444,30 +442,52 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       boolean isAllSuccessful = true;
       String batchErrorMessage = "";
 
-      for (String statement : statements) {
-        try {
-          PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get());
-          physicalPlan.setProposer(username.get());
-          if (physicalPlan.isQuery()) {
-            return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
-                "statement is query :" + statement, result);
-          }
-          TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan);
-          if (resp.getStatus().getStatusCode().equals(TS_StatusCode.SUCCESS_STATUS)) {
-            result.add(Statement.SUCCESS_NO_INFO);
-          } else {
+      PhysicalPlan[] physicalPlans = new PhysicalPlan[statements.size()];
+      boolean allInsert = true;
+
+      for (int i = 0; i < physicalPlans.length; i++) {
+        PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statements.get(i), zoneIds.get());
+        physicalPlan.setProposer(username.get());
+        physicalPlans[i] = physicalPlan;
+        if (!(physicalPlan instanceof InsertPlan)) {
+          allInsert = false;
+        }
+      }
+
+      if (allInsert) {
+        Pair<List<Integer>, String> pair = executeBatchInsert(physicalPlans);
+        result = pair.left;
+        // only used when having failure
+        batchErrorMessage = pair.right;
+        if (batchErrorMessage != null) {
+            isAllSuccessful = false;
+        }
+      } else {
+        for (int i = 0; i < physicalPlans.length; i++) {
+          PhysicalPlan physicalPlan = physicalPlans[i];
+          try {
+            physicalPlan.setProposer(username.get());
+            if (physicalPlan.isQuery()) {
+              return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
+                  "statement is query :" + statements.get(i), result);
+            }
+            TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan);
+            if (resp.getStatus().getStatusCode().equals(TS_StatusCode.SUCCESS_STATUS)) {
+              result.add(Statement.SUCCESS_NO_INFO);
+            } else {
+              result.add(Statement.EXECUTE_FAILED);
+              isAllSuccessful = false;
+              batchErrorMessage = resp.getStatus().getErrorMessage();
+            }
+          } catch (Exception e) {
+            String errMessage = String.format(
+                "Fail to generate physical plan and execute for statement "
+                    + "%s because %s",
+                statements.get(i), e.getMessage());
             result.add(Statement.EXECUTE_FAILED);
             isAllSuccessful = false;
-            batchErrorMessage = resp.getStatus().getErrorMessage();
+            batchErrorMessage = errMessage;
           }
-        } catch (Exception e) {
-          String errMessage = String.format(
-              "Fail to generate physcial plan and execute for statement "
-                  + "%s beacuse %s",
-              statement, e.getMessage());
-          result.add(Statement.EXECUTE_FAILED);
-          isAllSuccessful = false;
-          batchErrorMessage = errMessage;
         }
       }
       if (isAllSuccessful) {
@@ -482,6 +502,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
+  /**
+   * @param physicalPlans
+   * @return a list of return code and message
+   */
+  private Pair<List<Integer>, String> executeBatchInsert(PhysicalPlan[] physicalPlans) {
+    List<Integer> results = new ArrayList<>();
+
+    // null means all success
+    String message = null;
+
+
+    return new Pair<>(results, message);
+  }
+
+
   @Override
   public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) throws TException {
     try {