You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/05/24 11:56:47 UTC
[incubator-iotdb] 01/01: add batch
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch add_batch_insert
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 03052d17993c0c857162d7af32173fc1af4a0fec
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri May 24 19:56:34 2019 +0800
add batch
---
.../org/apache/iotdb/db/service/TSServiceImpl.java | 103 ++++++++++++++-------
1 file changed, 69 insertions(+), 34 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index b02f1c2..2bea855 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1,19 +1,15 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.service;
@@ -48,6 +44,7 @@ import org.apache.iotdb.db.qp.QueryProcessor;
import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -84,6 +81,7 @@ import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.apache.thrift.server.ServerContext;
import org.slf4j.Logger;
@@ -444,30 +442,52 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
boolean isAllSuccessful = true;
String batchErrorMessage = "";
- for (String statement : statements) {
- try {
- PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get());
- physicalPlan.setProposer(username.get());
- if (physicalPlan.isQuery()) {
- return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
- "statement is query :" + statement, result);
- }
- TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan);
- if (resp.getStatus().getStatusCode().equals(TS_StatusCode.SUCCESS_STATUS)) {
- result.add(Statement.SUCCESS_NO_INFO);
- } else {
+ PhysicalPlan[] physicalPlans = new PhysicalPlan[statements.size()];
+ boolean allInsert = true;
+
+ for (int i = 0; i < physicalPlans.length; i++) {
+ PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statements.get(i), zoneIds.get());
+ physicalPlan.setProposer(username.get());
+ physicalPlans[i] = physicalPlan;
+ if (!(physicalPlan instanceof InsertPlan)) {
+ allInsert = false;
+ }
+ }
+
+ if (allInsert) {
+ Pair<List<Integer>, String> pair = executeBatchInsert(physicalPlans);
+ result = pair.left;
+ // only used when having failure
+ batchErrorMessage = pair.right;
+ if (batchErrorMessage != null) {
+ isAllSuccessful = false;
+ }
+ } else {
+ for (int i = 0; i < physicalPlans.length; i++) {
+ PhysicalPlan physicalPlan = physicalPlans[i];
+ try {
+ physicalPlan.setProposer(username.get());
+ if (physicalPlan.isQuery()) {
+ return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
+ "statement is query :" + statements.get(i), result);
+ }
+ TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan);
+ if (resp.getStatus().getStatusCode().equals(TS_StatusCode.SUCCESS_STATUS)) {
+ result.add(Statement.SUCCESS_NO_INFO);
+ } else {
+ result.add(Statement.EXECUTE_FAILED);
+ isAllSuccessful = false;
+ batchErrorMessage = resp.getStatus().getErrorMessage();
+ }
+ } catch (Exception e) {
+ String errMessage = String.format(
+ "Fail to generate physical plan and execute for statement "
+ + "%s because %s",
+ statements.get(i), e.getMessage());
result.add(Statement.EXECUTE_FAILED);
isAllSuccessful = false;
- batchErrorMessage = resp.getStatus().getErrorMessage();
+ batchErrorMessage = errMessage;
}
- } catch (Exception e) {
- String errMessage = String.format(
- "Fail to generate physcial plan and execute for statement "
- + "%s beacuse %s",
- statement, e.getMessage());
- result.add(Statement.EXECUTE_FAILED);
- isAllSuccessful = false;
- batchErrorMessage = errMessage;
}
}
if (isAllSuccessful) {
@@ -482,6 +502,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
+ /**
+ * @param physicalPlans
+ * @return a list of return code and message
+ */
+ private Pair<List<Integer>, String> executeBatchInsert(PhysicalPlan[] physicalPlans) {
+ List<Integer> results = new ArrayList<>();
+
+ // null means all success
+ String message = null;
+
+
+ return new Pair<>(results, message);
+ }
+
+
@Override
public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) throws TException {
try {