You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/05/24 11:56:46 UTC

[incubator-iotdb] branch add_batch_insert created (now 03052d1)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch add_batch_insert
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 03052d1  add batch

This branch includes the following new commits:

     new 03052d1  add batch

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: add batch

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch add_batch_insert
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 03052d17993c0c857162d7af32173fc1af4a0fec
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri May 24 19:56:34 2019 +0800

    add batch
---
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 103 ++++++++++++++-------
 1 file changed, 69 insertions(+), 34 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index b02f1c2..2bea855 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1,19 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied.  See the License for the specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.iotdb.db.service;
@@ -48,6 +44,7 @@ import org.apache.iotdb.db.qp.QueryProcessor;
 import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -84,6 +81,7 @@ import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.TException;
 import org.apache.thrift.server.ServerContext;
 import org.slf4j.Logger;
@@ -444,30 +442,52 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       boolean isAllSuccessful = true;
       String batchErrorMessage = "";
 
-      for (String statement : statements) {
-        try {
-          PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get());
-          physicalPlan.setProposer(username.get());
-          if (physicalPlan.isQuery()) {
-            return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
-                "statement is query :" + statement, result);
-          }
-          TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan);
-          if (resp.getStatus().getStatusCode().equals(TS_StatusCode.SUCCESS_STATUS)) {
-            result.add(Statement.SUCCESS_NO_INFO);
-          } else {
+      PhysicalPlan[] physicalPlans = new PhysicalPlan[statements.size()];
+      boolean allInsert = true;
+
+      for (int i = 0; i < physicalPlans.length; i++) {
+        PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statements.get(i), zoneIds.get());
+        physicalPlan.setProposer(username.get());
+        physicalPlans[i] = physicalPlan;
+        if (!(physicalPlan instanceof InsertPlan)) {
+          allInsert = false;
+        }
+      }
+
+      if (allInsert) {
+        Pair<List<Integer>, String> pair = executeBatchInsert(physicalPlans);
+        result = pair.left;
+        // only used when having failure
+        batchErrorMessage = pair.right;
+        if (batchErrorMessage != null) {
+            isAllSuccessful = false;
+        }
+      } else {
+        for (int i = 0; i < physicalPlans.length; i++) {
+          PhysicalPlan physicalPlan = physicalPlans[i];
+          try {
+            physicalPlan.setProposer(username.get());
+            if (physicalPlan.isQuery()) {
+              return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
+                  "statement is query :" + statements.get(i), result);
+            }
+            TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan);
+            if (resp.getStatus().getStatusCode().equals(TS_StatusCode.SUCCESS_STATUS)) {
+              result.add(Statement.SUCCESS_NO_INFO);
+            } else {
+              result.add(Statement.EXECUTE_FAILED);
+              isAllSuccessful = false;
+              batchErrorMessage = resp.getStatus().getErrorMessage();
+            }
+          } catch (Exception e) {
+            String errMessage = String.format(
+                "Fail to generate physical plan and execute for statement "
+                    + "%s because %s",
+                statements.get(i), e.getMessage());
             result.add(Statement.EXECUTE_FAILED);
             isAllSuccessful = false;
-            batchErrorMessage = resp.getStatus().getErrorMessage();
+            batchErrorMessage = errMessage;
           }
-        } catch (Exception e) {
-          String errMessage = String.format(
-              "Fail to generate physcial plan and execute for statement "
-                  + "%s beacuse %s",
-              statement, e.getMessage());
-          result.add(Statement.EXECUTE_FAILED);
-          isAllSuccessful = false;
-          batchErrorMessage = errMessage;
         }
       }
       if (isAllSuccessful) {
@@ -482,6 +502,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
+  /**
+   * @param physicalPlans
+   * @return a list of return code and message
+   */
+  private Pair<List<Integer>, String> executeBatchInsert(PhysicalPlan[] physicalPlans) {
+    List<Integer> results = new ArrayList<>();
+
+    // null means all success
+    String message = null;
+
+
+    return new Pair<>(results, message);
+  }
+
+
   @Override
   public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) throws TException {
     try {