You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/25 09:16:19 UTC

[iotdb] branch lmh/FixIntoOperator created (now 037d74836b)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/FixIntoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 037d74836b refactor IntoOperator

This branch includes the following new commits:

     new a8decd1ea3 revert queryThreadCount in IoTDBConfig
     new f110d1e1c3 add queryThreadCount config in IoTDBSelectIntoIT
     new c644b7557a set queryThreadCount=1 in IoTDBSelectIntoIT
     new 037d74836b refactor IntoOperator

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/04: add queryThreadCount config in IoTDBSelectIntoIT

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f110d1e1c35918cc40ce446570ce9058bb6a35bf
Author: liuminghui233 <54...@qq.com>
AuthorDate: Fri Nov 25 12:34:17 2022 +0800

    add queryThreadCount config in IoTDBSelectIntoIT
---
 .../src/main/java/org/apache/iotdb/it/env/MppConfig.java   |  9 +++++++++
 .../main/java/org/apache/iotdb/itbase/env/BaseConfig.java  |  8 ++++++++
 .../org/apache/iotdb/db/it/env/StandaloneEnvConfig.java    | 14 ++++++++++++++
 .../apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java  |  3 +++
 .../apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java  |  3 +++
 .../apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java   |  5 +++++
 6 files changed, 42 insertions(+)

diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index 114f63ce0c..5e28a0c02f 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -333,4 +333,13 @@ public class MppConfig implements BaseConfig {
         "least_data_region_group_num", String.valueOf(leastDataRegionGroupNum));
     return this;
   }
+
+  @Override
+  public BaseConfig setQueryThreadCount(int queryThreadCount) {
+    if (queryThreadCount <= 0) {
+      queryThreadCount = Runtime.getRuntime().availableProcessors();
+    }
+    confignodeProperties.setProperty("query_thread_count", String.valueOf(queryThreadCount));
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 97bd4eb24e..b06262c66e 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -354,4 +354,12 @@ public interface BaseConfig {
   default int getLeastDataRegionGroupNum() {
     return 5;
   }
+
+  default BaseConfig setQueryThreadCount(int queryThreadCount) {
+    return this;
+  }
+
+  default int getQueryThreadCount() {
+    return Runtime.getRuntime().availableProcessors();
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
index 32291cb01c..bb6aff4fb0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
@@ -337,4 +337,18 @@ public class StandaloneEnvConfig implements BaseConfig {
   public int getSelectIntoInsertTabletPlanRowLimit() {
     return IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
   }
+
+  @Override
+  public BaseConfig setQueryThreadCount(int queryThreadCount) {
+    if (queryThreadCount <= 0) {
+      queryThreadCount = Runtime.getRuntime().availableProcessors();
+    }
+    IoTDBDescriptor.getInstance().getConfig().setQueryThreadCount(queryThreadCount);
+    return this;
+  }
+
+  @Override
+  public int getQueryThreadCount() {
+    return IoTDBDescriptor.getInstance().getConfig().getQueryThreadCount();
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
index 33db4ce761..e6b3057a2e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
@@ -41,8 +41,10 @@ public class IoTDBSelectInto2IT extends IoTDBSelectIntoIT {
     selectIntoInsertTabletPlanRowLimit =
         ConfigFactory.getConfig().getSelectIntoInsertTabletPlanRowLimit();
     numOfPointsPerPage = ConfigFactory.getConfig().getMaxNumberOfPointsInPage();
+    queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
     ConfigFactory.getConfig().setSelectIntoInsertTabletPlanRowLimit(8);
     ConfigFactory.getConfig().setMaxNumberOfPointsInPage(5);
+    ConfigFactory.getConfig().setQueryThreadCount(2);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }
@@ -53,5 +55,6 @@ public class IoTDBSelectInto2IT extends IoTDBSelectIntoIT {
     ConfigFactory.getConfig()
         .setSelectIntoInsertTabletPlanRowLimit(selectIntoInsertTabletPlanRowLimit);
     ConfigFactory.getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+    ConfigFactory.getConfig().setQueryThreadCount(queryThreadCount);
   }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
index 12b8425f9d..23796e317c 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
@@ -40,7 +40,9 @@ public class IoTDBSelectInto3IT extends IoTDBSelectIntoIT {
   public static void setUp() throws Exception {
     selectIntoInsertTabletPlanRowLimit =
         ConfigFactory.getConfig().getSelectIntoInsertTabletPlanRowLimit();
+    queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
     ConfigFactory.getConfig().setSelectIntoInsertTabletPlanRowLimit(5);
+    ConfigFactory.getConfig().setQueryThreadCount(2);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }
@@ -50,5 +52,6 @@ public class IoTDBSelectInto3IT extends IoTDBSelectIntoIT {
     EnvFactory.getEnv().cleanAfterClass();
     ConfigFactory.getConfig()
         .setSelectIntoInsertTabletPlanRowLimit(selectIntoInsertTabletPlanRowLimit);
+    ConfigFactory.getConfig().setQueryThreadCount(queryThreadCount);
   }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index 6236fcd7cf..bbbaf1e330 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.it.selectinto;
 
+import org.apache.iotdb.it.env.ConfigFactory;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
@@ -47,6 +48,7 @@ public class IoTDBSelectIntoIT {
 
   protected static int selectIntoInsertTabletPlanRowLimit;
   protected static int numOfPointsPerPage;
+  protected static int queryThreadCount;
 
   protected static final String[] SQLs =
       new String[] {
@@ -106,6 +108,8 @@ public class IoTDBSelectIntoIT {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
+    ConfigFactory.getConfig().setQueryThreadCount(2);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }
@@ -113,6 +117,7 @@ public class IoTDBSelectIntoIT {
   @AfterClass
   public static void tearDown() throws Exception {
     EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig().setQueryThreadCount(queryThreadCount);
   }
 
   // -------------------------------------- ALIGN BY TIME ---------------------------------------


[iotdb] 01/04: revert queryThreadCount in IoTDBConfig

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a8decd1ea3372b23bae5a106826baa247ed20389
Author: liuminghui233 <54...@qq.com>
AuthorDate: Fri Nov 25 12:27:27 2022 +0800

    revert queryThreadCount in IoTDBConfig
---
 server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6f84eb6608..b76e8a02b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -310,7 +310,7 @@ public class IoTDBConfig {
   private int flushThreadCount = Runtime.getRuntime().availableProcessors();
 
   /** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */
-  private int queryThreadCount = Math.max(4, Runtime.getRuntime().availableProcessors());
+  private int queryThreadCount = Runtime.getRuntime().availableProcessors();
 
   /** How many queries can be concurrently executed. When <= 0, use 1000. */
   private int maxAllowedConcurrentQueries = 1000;


[iotdb] 03/04: set queryThreadCount=1 in IoTDBSelectIntoIT

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c644b7557a22937453b6afbc49f5fc9b56de4040
Author: liuminghui233 <54...@qq.com>
AuthorDate: Fri Nov 25 14:52:11 2022 +0800

    set queryThreadCount=1 in IoTDBSelectIntoIT
---
 .../test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java | 2 +-
 .../test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java | 2 +-
 .../test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java  | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
index e6b3057a2e..5a11ae1f8f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
@@ -44,7 +44,7 @@ public class IoTDBSelectInto2IT extends IoTDBSelectIntoIT {
     queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
     ConfigFactory.getConfig().setSelectIntoInsertTabletPlanRowLimit(8);
     ConfigFactory.getConfig().setMaxNumberOfPointsInPage(5);
-    ConfigFactory.getConfig().setQueryThreadCount(2);
+    ConfigFactory.getConfig().setQueryThreadCount(1);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
index 23796e317c..f37e12b588 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
@@ -42,7 +42,7 @@ public class IoTDBSelectInto3IT extends IoTDBSelectIntoIT {
         ConfigFactory.getConfig().getSelectIntoInsertTabletPlanRowLimit();
     queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
     ConfigFactory.getConfig().setSelectIntoInsertTabletPlanRowLimit(5);
-    ConfigFactory.getConfig().setQueryThreadCount(2);
+    ConfigFactory.getConfig().setQueryThreadCount(1);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index bbbaf1e330..5102d43e63 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -109,7 +109,7 @@ public class IoTDBSelectIntoIT {
   @BeforeClass
   public static void setUp() throws Exception {
     queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
-    ConfigFactory.getConfig().setQueryThreadCount(2);
+    ConfigFactory.getConfig().setQueryThreadCount(1);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }


[iotdb] 04/04: refactor IntoOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 037d74836ba6061b770c5e3070545bd09ad7ae5e
Author: liuminghui233 <54...@qq.com>
AuthorDate: Fri Nov 25 17:12:26 2022 +0800

    refactor IntoOperator
---
 .../operator/process/AbstractIntoOperator.java     | 101 +++++++++++++++++----
 .../operator/process/DeviceViewIntoOperator.java   |  79 +++++++++++-----
 .../execution/operator/process/IntoOperator.java   |  44 ++++++---
 3 files changed, 171 insertions(+), 53 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index f331b2ed87..9ec9b0d5bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -38,6 +38,11 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,19 +50,33 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
 public abstract class AbstractIntoOperator implements ProcessOperator {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIntoOperator.class);
+
   protected final OperatorContext operatorContext;
   protected final Operator child;
 
+  protected TsBlock cachedTsBlock;
+
   protected List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
 
   protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
   private DataNodeInternalClient client;
 
+  private ListenableFuture<?> isBlocked = NOT_BLOCKED;
+
+  private final ListeningExecutorService writeOperationExecutor =
+      MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+  private ListenableFuture<TSStatus> writeOperationFuture;
+
   public AbstractIntoOperator(
       OperatorContext operatorContext,
       Operator child,
@@ -87,10 +106,21 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     return insertTabletStatementGenerators;
   }
 
-  protected void insertMultiTabletsInternally(boolean needCheck) {
+  protected boolean insertMultiTabletsInternally(boolean needCheck) {
+    InsertMultiTabletsStatement insertMultiTabletsStatement =
+        constructInsertMultiTabletsStatement(needCheck);
+    if (insertMultiTabletsStatement == null) {
+      return false;
+    }
+
+    executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
+    return true;
+  }
+
+  protected InsertMultiTabletsStatement constructInsertMultiTabletsStatement(boolean needCheck) {
     if (insertTabletStatementGenerators == null
         || (needCheck && !existFullStatement(insertTabletStatementGenerators))) {
-      return;
+      return null;
     }
 
     List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
@@ -100,28 +130,64 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
       }
     }
     if (insertTabletStatementList.isEmpty()) {
-      return;
+      return null;
     }
 
     InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
     insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+    return insertMultiTabletsStatement;
+  }
 
+  protected void executeInsertMultiTabletsStatement(
+      InsertMultiTabletsStatement insertMultiTabletsStatement) {
     if (client == null) {
       client = new DataNodeInternalClient(operatorContext.getSessionInfo());
     }
-    TSStatus executionStatus = client.insertTablets(insertMultiTabletsStatement);
-    if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-      String message =
-          String.format(
-              "Error occurred while inserting tablets in SELECT INTO: %s",
-              executionStatus.getMessage());
-      throw new IntoProcessException(message);
-    }
 
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-      generator.reset();
+    isBlocked = SettableFuture.create();
+    writeOperationFuture =
+        writeOperationExecutor.submit(
+            () -> {
+              LOGGER.info("");
+              return client.insertTablets(insertMultiTabletsStatement);
+            });
+
+    writeOperationFuture.addListener(
+        () -> {
+          LOGGER.info("");
+          ((SettableFuture<Void>) isBlocked).set(null);
+        },
+        writeOperationExecutor);
+  }
+
+  protected boolean handleFuture() {
+    if (writeOperationFuture != null) {
+      if (writeOperationFuture.isDone()) {
+        try {
+          TSStatus executionStatus = writeOperationFuture.get();
+          if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+              && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+            String message =
+                String.format(
+                    "Error occurred while inserting tablets in SELECT INTO: %s",
+                    executionStatus.getMessage());
+            throw new IntoProcessException(message);
+          }
+
+          for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+            generator.reset();
+          }
+
+          writeOperationFuture = null;
+          return true;
+        } catch (ExecutionException | InterruptedException e) {
+          throw new IntoProcessException(e.getMessage());
+        }
+      } else {
+        return false;
+      }
     }
+    return true;
   }
 
   private boolean existFullStatement(
@@ -164,12 +230,14 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
+    return successfulAsList(Arrays.asList(isBlocked, child.isBlocked()));
   }
 
   @Override
   public boolean hasNext() {
-    return existNonEmptyStatement(insertTabletStatementGenerators) || child.hasNext();
+    return cachedTsBlock != null
+        || existNonEmptyStatement(insertTabletStatementGenerators)
+        || child.hasNext();
   }
 
   @Override
@@ -177,6 +245,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     if (client != null) {
       client.close();
     }
+    writeOperationExecutor.shutdown();
     child.close();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 68329d56e1..7ef65c9e16 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -20,12 +20,12 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.IntoProcessException;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -75,39 +75,70 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
   }
 
   @Override
-  public TsBlock next() throws IntoProcessException {
-    TsBlock inputTsBlock = child.next();
-    if (inputTsBlock != null) {
-      String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
-      if (!Objects.equals(device, currentDevice)) {
-        insertMultiTabletsInternally(false);
-        updateResultTsBlock();
-
-        insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device);
-        currentDevice = device;
-      }
-      int readIndex = 0;
-      while (readIndex < inputTsBlock.getPositionCount()) {
-        int lastReadIndex = readIndex;
-        for (IntoOperator.InsertTabletStatementGenerator generator :
-            insertTabletStatementGenerators) {
-          lastReadIndex =
-              Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
-        }
-        readIndex = lastReadIndex;
-        insertMultiTabletsInternally(true);
-      }
+  public TsBlock next() {
+    if (!handleFuture()) {
+      return null;
+    }
+
+    if (!processTsBlock(cachedTsBlock)) {
+      return null;
     }
+    cachedTsBlock = null;
 
     if (child.hasNext()) {
+      processTsBlock(child.next());
       return null;
     } else {
-      insertMultiTabletsInternally(false);
+      InsertMultiTabletsStatement insertMultiTabletsStatement =
+          constructInsertMultiTabletsStatement(false);
       updateResultTsBlock();
+      currentDevice = null;
+
+      if (insertMultiTabletsStatement != null) {
+        executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
+        return null;
+      }
       return resultTsBlockBuilder.build();
     }
   }
 
+  private boolean processTsBlock(TsBlock inputTsBlock) {
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return true;
+    }
+
+    String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
+    if (!Objects.equals(device, currentDevice)) {
+      InsertMultiTabletsStatement insertMultiTabletsStatement =
+          constructInsertMultiTabletsStatement(false);
+      updateResultTsBlock();
+
+      insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device);
+      currentDevice = device;
+
+      if (insertMultiTabletsStatement != null) {
+        executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
+        cachedTsBlock = inputTsBlock;
+        return false;
+      }
+    }
+
+    int readIndex = 0;
+    while (readIndex < inputTsBlock.getPositionCount()) {
+      int lastReadIndex = readIndex;
+      for (IntoOperator.InsertTabletStatementGenerator generator :
+          insertTabletStatementGenerators) {
+        lastReadIndex = Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
+      }
+      readIndex = lastReadIndex;
+      if (insertMultiTabletsInternally(true)) {
+        cachedTsBlock = inputTsBlock.subTsBlock(readIndex);
+        return false;
+      }
+    }
+    return true;
+  }
+
   private void updateResultTsBlock() {
     if (currentDevice == null) {
       return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 4a34488779..9e5c46e9fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -60,28 +60,46 @@ public class IntoOperator extends AbstractIntoOperator {
 
   @Override
   public TsBlock next() {
-    TsBlock inputTsBlock = child.next();
-    if (inputTsBlock != null) {
-      int readIndex = 0;
-      while (readIndex < inputTsBlock.getPositionCount()) {
-        int lastReadIndex = readIndex;
-        for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-          lastReadIndex =
-              Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
-        }
-        readIndex = lastReadIndex;
-        insertMultiTabletsInternally(true);
-      }
+    if (!handleFuture()) {
+      return null;
     }
 
+    if (!processTsBlock(cachedTsBlock)) {
+      return null;
+    }
+    cachedTsBlock = null;
+
     if (child.hasNext()) {
+      processTsBlock(child.next());
       return null;
     } else {
-      insertMultiTabletsInternally(false);
+      if (insertMultiTabletsInternally(false)) {
+        return null;
+      }
       return constructResultTsBlock();
     }
   }
 
+  private boolean processTsBlock(TsBlock inputTsBlock) {
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return true;
+    }
+
+    int readIndex = 0;
+    while (readIndex < inputTsBlock.getPositionCount()) {
+      int lastReadIndex = readIndex;
+      for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+        lastReadIndex = Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
+      }
+      readIndex = lastReadIndex;
+      if (insertMultiTabletsInternally(true)) {
+        cachedTsBlock = inputTsBlock.subTsBlock(readIndex);
+        return false;
+      }
+    }
+    return true;
+  }
+
   private TsBlock constructResultTsBlock() {
     List<TSDataType> outputDataTypes =
         ColumnHeaderConstant.selectIntoColumnHeaders.stream()