You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/12/02 01:43:34 UTC

[GitHub] [iotdb] liuminghui233 commented on a diff in pull request #8279: [To rel/1.0] [IOTDB-4978] Fix deadLock caused by blocked operation in IntoOperator

liuminghui233 commented on code in PR #8279:
URL: https://github.com/apache/iotdb/pull/8279#discussion_r1037714252


##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java:
##########
@@ -100,27 +138,59 @@ protected void insertMultiTabletsInternally(boolean needCheck) {
       }
     }
     if (insertTabletStatementList.isEmpty()) {
-      return;
+      return null;
     }
 
     InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
     insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+    return insertMultiTabletsStatement;
+  }
 
+  protected void executeInsertMultiTabletsStatement(
+      InsertMultiTabletsStatement insertMultiTabletsStatement) {
     if (client == null) {
       client = new DataNodeInternalClient(operatorContext.getSessionInfo());
     }
-    TSStatus executionStatus = client.insertTablets(insertMultiTabletsStatement);
-    if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-      String message =
-          String.format(
-              "Error occurred while inserting tablets in SELECT INTO: %s",
-              executionStatus.getMessage());
-      throw new IntoProcessException(message);
+
+    writeOperationFuture =
+        Futures.submit(
+            () -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor);
+  }
+
+  /** Return true if the previous write task has done. */
+  protected boolean processWriteOperationFuture() {
+    if (writeOperationFuture == null) {
+      return true;
     }
 
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-      generator.reset();
+    try {
+      if (!writeOperationFuture.isDone()) {
+        return false;
+      }

Review Comment:
   fixed



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java:
##########
@@ -162,42 +232,62 @@ public OperatorContext getOperatorContext() {
     return operatorContext;
   }
 
+  private boolean writeOperationDone() {
+    if (writeOperationFuture == null) {
+      return true;
+    }
+
+    return writeOperationFuture.isDone();
+  }
+
   @Override
   public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
+    ListenableFuture<?> childBlocked = child.isBlocked();
+    if (writeOperationDone() && childBlocked.isDone()) {
+      return NOT_BLOCKED;
+    } else if (!writeOperationDone() && childBlocked.isDone()) {
+      return writeOperationFuture;
+    } else if (writeOperationDone() && !childBlocked.isDone()) {
+      return childBlocked;
+    } else {
+      return successfulAsList(Arrays.asList(writeOperationFuture, childBlocked));
+    }

Review Comment:
   fixed



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java:
##########
@@ -48,40 +49,70 @@ public IntoOperator(
       Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
       Map<String, Boolean> targetDeviceToAlignedMap,
       List<Pair<String, PartialPath>> sourceTargetPathPairList,
-      Map<String, InputLocation> sourceColumnToInputLocationMap) {
+      Map<String, InputLocation> sourceColumnToInputLocationMap,
+      ExecutorService intoOperationExecutor,
+      long maxStatementSize,
+      long maxReturnSize) {
     super(
         operatorContext,
         child,
         constructInsertTabletStatementGenerators(
             targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap),
-        sourceColumnToInputLocationMap);
+        sourceColumnToInputLocationMap,
+        intoOperationExecutor,
+        maxStatementSize,
+        maxReturnSize);
     this.sourceTargetPathPairList = sourceTargetPathPairList;
   }
 
   @Override
   public TsBlock next() {
-    TsBlock inputTsBlock = child.next();
-    if (inputTsBlock != null) {
-      int readIndex = 0;
-      while (readIndex < inputTsBlock.getPositionCount()) {
-        int lastReadIndex = readIndex;
-        for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-          lastReadIndex =
-              Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
-        }
-        readIndex = lastReadIndex;
-        insertMultiTabletsInternally(true);
-      }
+    if (!processWriteOperationFuture()) {
+      return null;
+    }
+
+    if (!processTsBlock(cachedTsBlock)) {
+      return null;
     }
+    cachedTsBlock = null;
 
     if (child.hasNext()) {
+      TsBlock inputTsBlock = child.next();
+      processTsBlock(inputTsBlock);
+
+      // call child.next only once
       return null;
     } else {
-      insertMultiTabletsInternally(false);
+      if (insertMultiTabletsInternally(false)) {
+        return null;
+      }
+
+      finished = true;
       return constructResultTsBlock();
     }
   }
 
+  /** Return true if write task is submitted during processing */

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org