You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/01 02:01:25 UTC

[iotdb] 05/09: fix

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 25bf877dbb03751c792772f1c56488b855578d32
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 29 22:09:37 2022 +0800

    fix
---
 .../operator/process/AbstractIntoOperator.java     | 62 ++++++++++------------
 .../operator/process/DeviceViewIntoOperator.java   |  7 ++-
 .../execution/operator/process/IntoOperator.java   |  7 ++-
 3 files changed, 37 insertions(+), 39 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 9ec9b0d5bc..6181dab228 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -146,48 +146,40 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
     isBlocked = SettableFuture.create();
     writeOperationFuture =
-        writeOperationExecutor.submit(
-            () -> {
-              LOGGER.info("");
-              return client.insertTablets(insertMultiTabletsStatement);
-            });
-
+        writeOperationExecutor.submit(() -> client.insertTablets(insertMultiTabletsStatement));
     writeOperationFuture.addListener(
-        () -> {
-          LOGGER.info("");
-          ((SettableFuture<Void>) isBlocked).set(null);
-        },
-        writeOperationExecutor);
+        () -> ((SettableFuture<Void>) isBlocked).set(null), writeOperationExecutor);
   }
 
-  protected boolean handleFuture() {
-    if (writeOperationFuture != null) {
-      if (writeOperationFuture.isDone()) {
-        try {
-          TSStatus executionStatus = writeOperationFuture.get();
-          if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-              && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-            String message =
-                String.format(
-                    "Error occurred while inserting tablets in SELECT INTO: %s",
-                    executionStatus.getMessage());
-            throw new IntoProcessException(message);
-          }
-
-          for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-            generator.reset();
-          }
+  protected boolean writeOperationDone() {
+    if (writeOperationFuture == null) {
+      return true;
+    }
 
-          writeOperationFuture = null;
-          return true;
-        } catch (ExecutionException | InterruptedException e) {
-          throw new IntoProcessException(e.getMessage());
-        }
-      } else {
+    try {
+      if (!writeOperationFuture.isDone()) {
         return false;
       }
+
+      TSStatus executionStatus = writeOperationFuture.get();
+      if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        String message =
+            String.format(
+                "Error occurred while inserting tablets in SELECT INTO: %s",
+                executionStatus.getMessage());
+        throw new IntoProcessException(message);
+      }
+
+      for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+        generator.reset();
+      }
+
+      writeOperationFuture = null;
+      return true;
+    } catch (ExecutionException | InterruptedException e) {
+      throw new IntoProcessException(e.getMessage());
     }
-    return true;
   }
 
   private boolean existFullStatement(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 7ef65c9e16..e36bb49168 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -76,7 +76,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
 
   @Override
   public TsBlock next() {
-    if (!handleFuture()) {
+    if (!writeOperationDone()) {
       return null;
     }
 
@@ -86,7 +86,10 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
     cachedTsBlock = null;
 
     if (child.hasNext()) {
-      processTsBlock(child.next());
+      TsBlock inputTsBlock = child.next();
+      processTsBlock(inputTsBlock);
+
+      // call child.next only once
       return null;
     } else {
       InsertMultiTabletsStatement insertMultiTabletsStatement =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 9e5c46e9fa..67a4fb0b0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -60,7 +60,7 @@ public class IntoOperator extends AbstractIntoOperator {
 
   @Override
   public TsBlock next() {
-    if (!handleFuture()) {
+    if (!writeOperationDone()) {
       return null;
     }
 
@@ -70,7 +70,10 @@ public class IntoOperator extends AbstractIntoOperator {
     cachedTsBlock = null;
 
     if (child.hasNext()) {
-      processTsBlock(child.next());
+      TsBlock inputTsBlock = child.next();
+      processTsBlock(inputTsBlock);
+
+      // call child.next only once
       return null;
     } else {
       if (insertMultiTabletsInternally(false)) {