You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/06/15 02:52:26 UTC

[incubator-iotdb] branch refactor_batch_insert created (now 59c5487)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch refactor_batch_insert
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 59c5487  refactor the execution path of InsertTabletPlan

This branch includes the following new commits:

     new 59c5487  refactor the execution path of InsertTabletPlan

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: refactor the execution path of InsertTabletPlan

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch refactor_batch_insert
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 59c5487cd8a9864d7cfefaedcae9e645732d2e3f
Author: jt2594838 <jt...@163.com>
AuthorDate: Mon Jun 15 10:52:09 2020 +0800

    refactor the execution path of InsertTabletPlan
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 12 ++---
 .../engine/storagegroup/StorageGroupProcessor.java | 44 ++++++++++++------
 .../db/exception/BatchInsertionException.java      | 41 +++++++++++++++++
 .../apache/iotdb/db/qp/executor/IPlanExecutor.java |  4 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  8 +++-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 53 ++++++++--------------
 6 files changed, 106 insertions(+), 56 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 758c77f..544ea7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.BatchInsertionException;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -289,20 +290,19 @@ public class StorageEngine implements IService {
    *
    * @return result of each row
    */
-  public TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws StorageEngineException {
+  public void insertTablet(InsertTabletPlan insertTabletPlan)
+      throws StorageEngineException, BatchInsertionException {
     StorageGroupProcessor storageGroupProcessor;
     try {
       storageGroupProcessor = getProcessor(insertTabletPlan.getDeviceId());
     } catch (StorageEngineException e) {
-      logger.warn("get StorageGroupProcessor of device {} failed, because {}",
-          insertTabletPlan.getDeviceId(),
-          e.getMessage(), e);
-      throw new StorageEngineException(e);
+      throw new StorageEngineException(String.format("Get StorageGroupProcessor of device %s "
+          + "failed", insertTabletPlan.getDeviceId()), e);
     }
 
     // TODO monitor: update statistics
     try {
-      return storageGroupProcessor.insertTablet(insertTabletPlan);
+      storageGroupProcessor.insertTablet(insertTabletPlan);
     } catch (WriteProcessException e) {
       throw new StorageEngineException(e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 8ba9020..e115c9f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -25,6 +25,7 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
@@ -62,6 +63,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.BatchInsertionException;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.MergeException;
@@ -622,10 +624,19 @@ public class StorageGroupProcessor {
     }
   }
 
-  public TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws WriteProcessException {
+  /**
+   * Insert a tablet (rows belonging to the same devices) into this storage group.
+   * @param insertTabletPlan
+   * @throws WriteProcessException when update last cache failed
+   * @throws BatchInsertionException if some of the rows failed to be inserted
+   */
+  public void insertTablet(InsertTabletPlan insertTabletPlan) throws WriteProcessException,
+      BatchInsertionException {
     writeLock();
     try {
       TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
+      Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
+      boolean noFailure = true;
 
       /*
        * assume that batch has been sorted by client
@@ -638,13 +649,14 @@ public class StorageGroupProcessor {
           results[loc] = RpcUtils.getStatus(TSStatusCode.OUT_OF_TTL_ERROR,
               "time " + currTime + " in current line is out of TTL: " + dataTTL);
           loc++;
+          noFailure = false;
         } else {
           break;
         }
       }
       // loc pointing at first legal position
       if (loc == insertTabletPlan.getRowCount()) {
-        return results;
+        throw new BatchInsertionException(results);
       }
       // before is first start point
       int before = loc;
@@ -659,12 +671,12 @@ public class StorageGroupProcessor {
       while (loc < insertTabletPlan.getRowCount()) {
         long time = insertTabletPlan.getTimes()[loc];
         long curTimePartition = StorageEngine.getTimePartition(time);
-        results[loc] = RpcUtils.SUCCESS_STATUS;
         // start next partition
         if (curTimePartition != beforeTimePartition) {
           // insert last time partition
-          insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence, results,
-              beforeTimePartition);
+          noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence,
+              results,
+              beforeTimePartition) && noFailure;
           // re initialize
           before = loc;
           beforeTimePartition = curTimePartition;
@@ -678,8 +690,8 @@ public class StorageGroupProcessor {
           // judge if we should insert sequence
           if (!isSequence && time > lastFlushTime) {
             // insert into unsequence and then start sequence
-            insertTabletToTsFileProcessor(insertTabletPlan, before, loc, false, results,
-                beforeTimePartition);
+            noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, false, results,
+                beforeTimePartition) && noFailure;
             before = loc;
             isSequence = true;
           }
@@ -689,14 +701,16 @@ public class StorageGroupProcessor {
 
       // do not forget last part
       if (before < loc) {
-        insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence, results,
-            beforeTimePartition);
+        noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence,
+            results, beforeTimePartition) && noFailure;
       }
       long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
           insertTabletPlan.getDeviceId(), Long.MIN_VALUE);
       tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);
 
-      return results;
+      if (!noFailure) {
+        throw new BatchInsertionException(results);
+      }
     } finally {
       writeUnlock();
     }
@@ -719,12 +733,13 @@ public class StorageGroupProcessor {
    * @param end end index of rows to be inserted in insertTabletPlan
    * @param results result array
    * @param timePartitionId time partition id
+   * @return false if any failure occurs when inserting the tablet, true otherwise
    */
-  private void insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
+  private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
       int start, int end, boolean sequence, TSStatus[] results, long timePartitionId) {
     // return when start >= end
     if (start >= end) {
-      return;
+      return false;
     }
 
     TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
@@ -733,14 +748,14 @@ public class StorageGroupProcessor {
         results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
             "can not create TsFileProcessor, timePartitionId: " + timePartitionId);
       }
-      return;
+      return false;
     }
 
     try {
       tsFileProcessor.insertTablet(insertTabletPlan, start, end, results);
     } catch (WriteProcessException e) {
       logger.error("insert to TsFileProcessor error ", e);
-      return;
+      return false;
     }
 
     latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>());
@@ -756,6 +771,7 @@ public class StorageGroupProcessor {
     if (tsFileProcessor.shouldFlush()) {
       fileFlushPolicy.apply(this, tsFileProcessor, sequence);
     }
+    return true;
   }
 
   private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime)
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/BatchInsertionException.java b/server/src/main/java/org/apache/iotdb/db/exception/BatchInsertionException.java
new file mode 100644
index 0000000..182b3e4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/BatchInsertionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+public class BatchInsertionException extends QueryProcessException {
+
+  private TSStatus[] failingStatus;
+
+  public BatchInsertionException(TSStatus[] failingStatus) {
+    super("Batch insertion failed");
+    this.failingStatus = failingStatus;
+  }
+
+  public void setFailingStatus(TSStatus[] failingStatus) {
+    this.failingStatus = failingStatus;
+  }
+
+  public TSStatus[] getFailingStatus() {
+    return failingStatus;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index 24a2629..6fb5d98 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.qp.executor;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import org.apache.iotdb.db.exception.BatchInsertionException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -93,6 +94,7 @@ public interface IPlanExecutor {
    * execute batch insert plan
    *
    * @return result of each row
+   * @throws BatchInsertionException when some of the rows failed
    */
-  TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException;
+  void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 2069163..dd595f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.BatchInsertionException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -191,6 +192,9 @@ public class PlanExecutor implements IPlanExecutor {
       case INSERT:
         insert((InsertPlan) plan);
         return true;
+      case BATCHINSERT:
+        insertTablet((InsertTabletPlan) plan);
+        return true;
       case CREATE_ROLE:
       case DELETE_ROLE:
       case CREATE_USER:
@@ -1065,7 +1069,7 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   @Override
-  public TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
+  public void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
     MNode node = null;
     try {
       String[] measurementList = insertTabletPlan.getMeasurements();
@@ -1106,7 +1110,7 @@ public class PlanExecutor implements IPlanExecutor {
         measurementList[i] = measurementNode.getName();
       }
       insertTabletPlan.setSchemas(schemas);
-      return StorageEngine.getInstance().insertTablet(insertTabletPlan);
+      StorageEngine.getInstance().insertTablet(insertTabletPlan);
     } catch (StorageEngineException | MetadataException e) {
       throw new QueryProcessException(e);
     } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index d17457e..7d4e735 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.cost.statistic.Operation;
+import org.apache.iotdb.db.exception.BatchInsertionException;
 import org.apache.iotdb.db.exception.QueryInBatchStatementException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -1181,29 +1182,19 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       insertTabletPlan.setRowCount(req.size);
       insertTabletPlan.setDataTypes(req.types);
 
-      boolean isAllSuccessful = true;
       TSStatus status = checkAuthority(insertTabletPlan, req.getSessionId());
       if (status != null) {
         return RpcUtils.getTSBatchExecuteStatementResp(status);
       }
-      TSStatus[] tsStatusArray = executor.insertTablet(insertTabletPlan);
-
-      for (TSStatus tsStatus : tsStatusArray) {
-        if (tsStatus.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          isAllSuccessful = false;
-          break;
-        }
-      }
 
-      if (isAllSuccessful) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Insert one Tablet successfully");
-        }
-        return RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
-      } else {
-        logger.debug("Insert one Tablet failed!");
-        return RpcUtils.getTSBatchExecuteStatementResp(Arrays.asList(tsStatusArray));
+      executeNonQuery(insertTabletPlan);
+      if (logger.isDebugEnabled()) {
+        logger.debug("Insert one Tablet successfully");
       }
+      return RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+    } catch (BatchInsertionException e) {
+      logger.debug("Insert one Tablet failed!");
+      return RpcUtils.getTSBatchExecuteStatementResp(Arrays.asList(e.getFailingStatus()));
     } catch (Exception e) {
       logger.error("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e);
       return RpcUtils
@@ -1235,28 +1226,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         insertTabletPlan.setRowCount(req.sizeList.get(i));
         insertTabletPlan.setDataTypes(req.typesList.get(i));
 
-        boolean isCurrentTabletSuccessful = true;
         TSStatus status = checkAuthority(insertTabletPlan, req.getSessionId());
         if (status != null) {
           statusList.add(status);
           continue;
         }
-        TSStatus[] tsStatusArray = executor.insertTablet(insertTabletPlan);
-        TSStatus failed = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
 
-        for (TSStatus tsStatus : tsStatusArray) {
-          if (tsStatus.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-            isCurrentTabletSuccessful = false;
-            failed = tsStatus;
-            break;
-          }
-        }
-
-        if (isCurrentTabletSuccessful) {
-          statusList.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
-        } else {
-          statusList.add(failed);
-        }
+        insertOneTablet(insertTabletPlan, statusList);
       }
       return RpcUtils.getTSBatchExecuteStatementResp(statusList);
     } catch (Exception e) {
@@ -1268,6 +1244,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
+  private void insertOneTablet(InsertTabletPlan plan, List<TSStatus> statusList)
+      throws QueryProcessException, StorageEngineException, StorageGroupNotSetException {
+    try {
+      executeNonQuery(plan);
+      statusList.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+    } catch (BatchInsertionException e) {
+      TSStatus failed = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
+      statusList.add(failed);
+    }
+  }
+
   @Override
   public TSStatus setStorageGroup(long sessionId, String storageGroup) {
     if (!checkLogin(sessionId)) {