You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/05 09:53:45 UTC

[iotdb] branch rel/1.0 updated: [To rel/1.0] [IOTDB-4978] Fix deadLock caused by blocked operation in IntoOperator

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new 478b2120e4 [To rel/1.0] [IOTDB-4978] Fix deadLock caused by blocked operation in IntoOperator
478b2120e4 is described below

commit 478b2120e4e1eb84ddbebbb706c0be6f39c8710d
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Mon Dec 5 17:53:38 2022 +0800

    [To rel/1.0] [IOTDB-4978] Fix deadLock caused by blocked operation in IntoOperator
---
 docs/UserGuide/Reference/Common-Config-Manual.md   |   9 +
 .../zh/UserGuide/Reference/Common-Config-Manual.md |   9 +
 .../java/org/apache/iotdb/it/env/MppConfig.java    |   9 +
 .../org/apache/iotdb/itbase/env/BaseConfig.java    |   8 +
 .../iotdb/db/it/env/StandaloneEnvConfig.java       |  14 ++
 .../iotdb/db/it/selectinto/IoTDBSelectInto2IT.java |   3 +
 .../iotdb/db/it/selectinto/IoTDBSelectInto3IT.java |   3 +
 .../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java  |   5 +
 .../resources/conf/iotdb-common.properties         |   4 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  13 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   8 +
 .../fragment/FragmentInstanceContext.java          |  22 ++-
 .../fragment/FragmentInstanceManager.java          |  17 +-
 .../operator/process/AbstractIntoOperator.java     | 215 ++++++++++++++++-----
 .../operator/process/DeviceViewIntoOperator.java   | 105 ++++++----
 .../execution/operator/process/IntoOperator.java   |  48 +++--
 .../plan/planner/LocalExecutionPlanContext.java    |   5 +
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  53 ++++-
 18 files changed, 436 insertions(+), 114 deletions(-)

diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md
index 72d59afcf4..e2c79e093e 100644
--- a/docs/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/UserGuide/Reference/Common-Config-Manual.md
@@ -1384,6 +1384,15 @@ Different configuration parameters take effect in the following three ways:
 |   Default   | 10000                                                        |
 |  Effective  | hot-load                                                      |
 
+* into\_operation\_execution\_thread\_count
+
+|    Name     | into\_operation\_execution\_thread\_count                     |
+| :---------: | :------------------------------------------------------------ |
+| Description | The number of threads in the thread pool that execute insert-tablet tasks |
+|    Type     | int32                                                         |
+|   Default   | 2                                                             |
+|  Effective  | After restarting system                                       |
+
 ### Continuous Query
 
 * continuous\_query\_execution\_thread
diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
index a78270d460..0c6c28fcfb 100644
--- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
@@ -1428,6 +1428,15 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。
 |    默认值    | 10000                                                        |
 | 改后生效方式 | 热加载                                                     |
 
+* into\_operation\_execution\_thread\_count
+
+|     名字     | into\_operation\_execution\_thread\_count |
+| :---------: | :---------------------------------------- |
+|     描述     | SELECT INTO 中执行写入任务的线程池的线程数      |
+|     类型     | int32                                     |
+|    默认值    | 2                                         |
+| 改后生效方式  | 重启服务生效                                 |
+
 ### 连续查询配置
 
 * continuous\_query\_submit\_thread\_count
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index a50d828dfb..df5535bdf8 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -379,4 +379,13 @@ public class MppConfig implements BaseConfig {
         "least_data_region_group_num", String.valueOf(leastDataRegionGroupNum));
     return this;
   }
+
+  @Override
+  public BaseConfig setQueryThreadCount(int queryThreadCount) {
+    if (queryThreadCount <= 0) {
+      queryThreadCount = Runtime.getRuntime().availableProcessors();
+    }
+    confignodeProperties.setProperty("query_thread_count", String.valueOf(queryThreadCount));
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 741e2f2d5c..31def0bc8b 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -404,4 +404,12 @@ public interface BaseConfig {
   default int getLeastDataRegionGroupNum() {
     return 5;
   }
+
+  default BaseConfig setQueryThreadCount(int queryThreadCount) {
+    return this;
+  }
+
+  default int getQueryThreadCount() {
+    return Runtime.getRuntime().availableProcessors();
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
index 32291cb01c..bb6aff4fb0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
@@ -337,4 +337,18 @@ public class StandaloneEnvConfig implements BaseConfig {
   public int getSelectIntoInsertTabletPlanRowLimit() {
     return IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
   }
+
+  @Override
+  public BaseConfig setQueryThreadCount(int queryThreadCount) {
+    if (queryThreadCount <= 0) {
+      queryThreadCount = Runtime.getRuntime().availableProcessors();
+    }
+    IoTDBDescriptor.getInstance().getConfig().setQueryThreadCount(queryThreadCount);
+    return this;
+  }
+
+  @Override
+  public int getQueryThreadCount() {
+    return IoTDBDescriptor.getInstance().getConfig().getQueryThreadCount();
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
index 33db4ce761..5a11ae1f8f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
@@ -41,8 +41,10 @@ public class IoTDBSelectInto2IT extends IoTDBSelectIntoIT {
     selectIntoInsertTabletPlanRowLimit =
         ConfigFactory.getConfig().getSelectIntoInsertTabletPlanRowLimit();
     numOfPointsPerPage = ConfigFactory.getConfig().getMaxNumberOfPointsInPage();
+    queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
     ConfigFactory.getConfig().setSelectIntoInsertTabletPlanRowLimit(8);
     ConfigFactory.getConfig().setMaxNumberOfPointsInPage(5);
+    ConfigFactory.getConfig().setQueryThreadCount(1);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }
@@ -53,5 +55,6 @@ public class IoTDBSelectInto2IT extends IoTDBSelectIntoIT {
     ConfigFactory.getConfig()
         .setSelectIntoInsertTabletPlanRowLimit(selectIntoInsertTabletPlanRowLimit);
     ConfigFactory.getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+    ConfigFactory.getConfig().setQueryThreadCount(queryThreadCount);
   }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
index 12b8425f9d..f37e12b588 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
@@ -40,7 +40,9 @@ public class IoTDBSelectInto3IT extends IoTDBSelectIntoIT {
   public static void setUp() throws Exception {
     selectIntoInsertTabletPlanRowLimit =
         ConfigFactory.getConfig().getSelectIntoInsertTabletPlanRowLimit();
+    queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
     ConfigFactory.getConfig().setSelectIntoInsertTabletPlanRowLimit(5);
+    ConfigFactory.getConfig().setQueryThreadCount(1);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }
@@ -50,5 +52,6 @@ public class IoTDBSelectInto3IT extends IoTDBSelectIntoIT {
     EnvFactory.getEnv().cleanAfterClass();
     ConfigFactory.getConfig()
         .setSelectIntoInsertTabletPlanRowLimit(selectIntoInsertTabletPlanRowLimit);
+    ConfigFactory.getConfig().setQueryThreadCount(queryThreadCount);
   }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index 6236fcd7cf..5102d43e63 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.it.selectinto;
 
+import org.apache.iotdb.it.env.ConfigFactory;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
@@ -47,6 +48,7 @@ public class IoTDBSelectIntoIT {
 
   protected static int selectIntoInsertTabletPlanRowLimit;
   protected static int numOfPointsPerPage;
+  protected static int queryThreadCount;
 
   protected static final String[] SQLs =
       new String[] {
@@ -106,6 +108,8 @@ public class IoTDBSelectIntoIT {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
+    ConfigFactory.getConfig().setQueryThreadCount(1);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }
@@ -113,6 +117,7 @@ public class IoTDBSelectIntoIT {
   @AfterClass
   public static void tearDown() throws Exception {
     EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig().setQueryThreadCount(queryThreadCount);
   }
 
   // -------------------------------------- ALIGN BY TIME ---------------------------------------
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index e29b1252b2..d932a8e9f3 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -877,6 +877,10 @@
 # Datatype: int
 # select_into_insert_tablet_plan_row_limit=10000
 
+# The number of threads in the thread pool that execute insert-tablet tasks
+# Datatype: int
+# into_operation_execution_thread_count=2
+
 ####################
 ### Continuous Query Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0f923aef2e..99dcf1b865 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -317,7 +317,7 @@ public class IoTDBConfig {
   private int flushThreadCount = Runtime.getRuntime().availableProcessors();
 
   /** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */
-  private int queryThreadCount = Math.max(4, Runtime.getRuntime().availableProcessors());
+  private int queryThreadCount = Runtime.getRuntime().availableProcessors();
 
   /** How many queries can be concurrently executed. When <= 0, use 1000. */
   private int maxAllowedConcurrentQueries = 1000;
@@ -685,6 +685,9 @@ public class IoTDBConfig {
    */
   private int selectIntoInsertTabletPlanRowLimit = 10000;
 
+  /** The number of threads in the thread pool that execute insert-tablet tasks. */
+  private int intoOperationExecutionThreadCount = 2;
+
   /** Default TSfile storage is in local file system */
   private FSType tsFileStorageFs = FSType.LOCAL;
 
@@ -1902,6 +1905,14 @@ public class IoTDBConfig {
     return selectIntoInsertTabletPlanRowLimit;
   }
 
+  public int getIntoOperationExecutionThreadCount() {
+    return intoOperationExecutionThreadCount;
+  }
+
+  public void setIntoOperationExecutionThreadCount(int intoOperationExecutionThreadCount) {
+    this.intoOperationExecutionThreadCount = intoOperationExecutionThreadCount;
+  }
+
   public int getCompactionWriteThroughputMbPerSec() {
     return compactionWriteThroughputMbPerSec;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index bba0dd92c5..7a25d6d21f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -903,6 +903,14 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "select_into_insert_tablet_plan_row_limit",
                 String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
+    conf.setIntoOperationExecutionThreadCount(
+        Integer.parseInt(
+            properties.getProperty(
+                "into_operation_execution_thread_count",
+                String.valueOf(conf.getIntoOperationExecutionThreadCount()))));
+    if (conf.getIntoOperationExecutionThreadCount() <= 0) {
+      conf.setIntoOperationExecutionThreadCount(2);
+    }
 
     conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 69928dcec3..5a4ede9538 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -33,6 +33,7 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -65,6 +66,8 @@ public class FragmentInstanceContext extends QueryContext {
   // session info
   private SessionInfo sessionInfo;
 
+  private ExecutorService intoOperationExecutor;
+
   //    private final GcMonitor gcMonitor;
   //    private final AtomicLong startNanos = new AtomicLong();
   //    private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -74,9 +77,12 @@ public class FragmentInstanceContext extends QueryContext {
   //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
 
   public static FragmentInstanceContext createFragmentInstanceContext(
-      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      SessionInfo sessionInfo,
+      ExecutorService intoOperationExecutor) {
     FragmentInstanceContext instanceContext =
-        new FragmentInstanceContext(id, stateMachine, sessionInfo);
+        new FragmentInstanceContext(id, stateMachine, sessionInfo, intoOperationExecutor);
     instanceContext.initialize();
     instanceContext.start();
     return instanceContext;
@@ -87,11 +93,15 @@ public class FragmentInstanceContext extends QueryContext {
   }
 
   private FragmentInstanceContext(
-      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      SessionInfo sessionInfo,
+      ExecutorService intoOperationExecutor) {
     this.id = id;
     this.stateMachine = stateMachine;
     this.executionEndTime.set(END_TIME_INITIAL_VALUE);
     this.sessionInfo = sessionInfo;
+    this.intoOperationExecutor = intoOperationExecutor;
   }
 
   @TestOnly
@@ -99,7 +109,7 @@ public class FragmentInstanceContext extends QueryContext {
       FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
     FragmentInstanceContext instanceContext =
         new FragmentInstanceContext(
-            id, stateMachine, new SessionInfo(1, "test", ZoneId.systemDefault().getId()));
+            id, stateMachine, new SessionInfo(1, "test", ZoneId.systemDefault().getId()), null);
     instanceContext.initialize();
     instanceContext.start();
     return instanceContext;
@@ -238,4 +248,8 @@ public class FragmentInstanceContext extends QueryContext {
   public Optional<Throwable> getFailureCause() {
     return Optional.ofNullable(stateMachine.getFailureCauses().peek());
   }
+
+  public ExecutorService getIntoOperationExecutor() {
+    return intoOperationExecutor;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index db653ff690..f3da54b6c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -68,6 +68,8 @@ public class FragmentInstanceManager {
   private static final long QUERY_TIMEOUT_MS =
       IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
 
+  private final ExecutorService intoOperationExecutor;
+
   public static FragmentInstanceManager getInstance() {
     return FragmentInstanceManager.InstanceHolder.INSTANCE;
   }
@@ -90,6 +92,11 @@ public class FragmentInstanceManager {
         200,
         200,
         TimeUnit.MILLISECONDS);
+
+    this.intoOperationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            IoTDBDescriptor.getInstance().getConfig().getIntoOperationExecutionThreadCount(),
+            "into-operation-executor");
   }
 
   public FragmentInstanceInfo execDataQueryFragmentInstance(
@@ -109,7 +116,10 @@ public class FragmentInstanceManager {
                         instanceId,
                         fragmentInstanceId ->
                             createFragmentInstanceContext(
-                                fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+                                fragmentInstanceId,
+                                stateMachine,
+                                instance.getSessionInfo(),
+                                intoOperationExecutor));
 
                 try {
                   DataDriver driver =
@@ -165,7 +175,10 @@ public class FragmentInstanceManager {
                       instanceId,
                       fragmentInstanceId ->
                           createFragmentInstanceContext(
-                              fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+                              fragmentInstanceId,
+                              stateMachine,
+                              instance.getSessionInfo(),
+                              intoOperationExecutor));
 
               try {
                 SchemaDriver driver =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index f331b2ed87..5f4b8dda34 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -37,7 +37,10 @@ import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,30 +48,162 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public abstract class AbstractIntoOperator implements ProcessOperator {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIntoOperator.class);
+
   protected final OperatorContext operatorContext;
   protected final Operator child;
 
+  protected TsBlock cachedTsBlock;
+
   protected List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
 
   protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
   private DataNodeInternalClient client;
 
+  private final ExecutorService writeOperationExecutor;
+  private ListenableFuture<TSStatus> writeOperationFuture;
+
+  protected boolean finished = false;
+
+  private final long maxRetainedSize;
+  private final long maxReturnSize;
+
   public AbstractIntoOperator(
       OperatorContext operatorContext,
       Operator child,
       List<InsertTabletStatementGenerator> insertTabletStatementGenerators,
-      Map<String, InputLocation> sourceColumnToInputLocationMap) {
+      Map<String, InputLocation> sourceColumnToInputLocationMap,
+      ExecutorService intoOperationExecutor,
+      long maxStatementSize) {
     this.operatorContext = operatorContext;
     this.child = child;
     this.insertTabletStatementGenerators = insertTabletStatementGenerators;
     this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
+    this.writeOperationExecutor = intoOperationExecutor;
+
+    this.maxRetainedSize = child.calculateMaxReturnSize() + maxStatementSize;
+    this.maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    ListenableFuture<?> childBlocked = child.isBlocked();
+    boolean writeDone = writeOperationDone();
+    if (writeDone && childBlocked.isDone()) {
+      return NOT_BLOCKED;
+    } else if (childBlocked.isDone()) {
+      return writeOperationFuture;
+    } else if (writeDone) {
+      return childBlocked;
+    } else {
+      return successfulAsList(Arrays.asList(writeOperationFuture, childBlocked));
+    }
+  }
+
+  private boolean writeOperationDone() {
+    if (writeOperationFuture == null) {
+      return true;
+    }
+
+    return writeOperationFuture.isDone();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return !finished;
+  }
+
+  @Override
+  public TsBlock next() {
+    if (!checkLastWriteOperation()) {
+      return null;
+    }
+
+    if (!processTsBlock(cachedTsBlock)) {
+      return null;
+    }
+    cachedTsBlock = null;
+
+    if (child.hasNext()) {
+      TsBlock inputTsBlock = child.next();
+      processTsBlock(inputTsBlock);
+
+      // call child.next only once
+      return null;
+    } else {
+      return tryToReturnResultTsBlock();
+    }
+  }
+
+  /**
+   * Check whether the last write operation was executed successfully, and throw an exception if the
+   * execution failed, otherwise continue to execute the operator.
+   *
+   * @return true if the last write operation has been executed successfully.
+   */
+  private boolean checkLastWriteOperation() {
+    if (writeOperationFuture == null) {
+      return true;
+    }
+
+    try {
+      if (!writeOperationFuture.isDone()) {
+        throw new IllegalStateException(
+            "The operator cannot continue until the last write operation is done.");
+      }
+
+      TSStatus executionStatus = writeOperationFuture.get();
+      if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        String message =
+            String.format(
+                "Error occurred while inserting tablets in SELECT INTO: %s",
+                executionStatus.getMessage());
+        throw new IntoProcessException(message);
+      }
+
+      for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+        generator.reset();
+      }
+
+      writeOperationFuture = null;
+      return true;
+    } catch (InterruptedException e) {
+      LOGGER.warn(
+          "{}: interrupted when processing write operation future with exception {}", this, e);
+      Thread.currentThread().interrupt();
+      throw new IntoProcessException(e.getMessage());
+    } catch (ExecutionException e) {
+      throw new IntoProcessException(e.getMessage());
+    }
   }
 
+  /**
+   * Write the data of the input TsBlock into Statement.
+   *
+   * <p>If the Statement is full, submit one write task and return false.
+   *
+   * <p>If TsBlock is empty, or all data has been written to Statement, return true.
+   */
+  protected abstract boolean processTsBlock(TsBlock inputTsBlock);
+
+  protected abstract TsBlock tryToReturnResultTsBlock();
+
   protected static List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
       Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
       Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
@@ -87,10 +222,22 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     return insertTabletStatementGenerators;
   }
 
-  protected void insertMultiTabletsInternally(boolean needCheck) {
+  /** Return true if write task is submitted successfully. */
+  protected boolean insertMultiTabletsInternally(boolean needCheck) {
+    InsertMultiTabletsStatement insertMultiTabletsStatement =
+        constructInsertMultiTabletsStatement(needCheck);
+    if (insertMultiTabletsStatement == null) {
+      return false;
+    }
+
+    executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
+    return true;
+  }
+
+  protected InsertMultiTabletsStatement constructInsertMultiTabletsStatement(boolean needCheck) {
     if (insertTabletStatementGenerators == null
         || (needCheck && !existFullStatement(insertTabletStatementGenerators))) {
-      return;
+      return null;
     }
 
     List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
@@ -100,28 +247,23 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
       }
     }
     if (insertTabletStatementList.isEmpty()) {
-      return;
+      return null;
     }
 
     InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
     insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+    return insertMultiTabletsStatement;
+  }
 
+  protected void executeInsertMultiTabletsStatement(
+      InsertMultiTabletsStatement insertMultiTabletsStatement) {
     if (client == null) {
       client = new DataNodeInternalClient(operatorContext.getSessionInfo());
     }
-    TSStatus executionStatus = client.insertTablets(insertMultiTabletsStatement);
-    if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-      String message =
-          String.format(
-              "Error occurred while inserting tablets in SELECT INTO: %s",
-              executionStatus.getMessage());
-      throw new IntoProcessException(message);
-    }
 
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-      generator.reset();
-    }
+    writeOperationFuture =
+        Futures.submit(
+            () -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor);
   }
 
   private boolean existFullStatement(
@@ -134,19 +276,6 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     return false;
   }
 
-  private boolean existNonEmptyStatement(
-      List<InsertTabletStatementGenerator> insertTabletStatementGenerators) {
-    if (insertTabletStatementGenerators == null) {
-      return false;
-    }
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-      if (generator != null && !generator.isEmpty()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   protected int findWritten(String device, String measurement) {
     for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
       if (!Objects.equals(generator.getDevice(), device)) {
@@ -158,18 +287,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
   }
 
   @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
-  }
-
-  @Override
-  public boolean hasNext() {
-    return existNonEmptyStatement(insertTabletStatementGenerators) || child.hasNext();
+  public boolean isFinished() {
+    return finished;
   }
 
   @Override
@@ -177,27 +296,25 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     if (client != null) {
       client.close();
     }
+    if (writeOperationFuture != null) {
+      writeOperationFuture.cancel(true);
+    }
     child.close();
   }
 
-  @Override
-  public boolean isFinished() {
-    return !hasNext();
-  }
-
   @Override
   public long calculateMaxPeekMemory() {
-    return child.calculateMaxPeekMemory();
+    return maxReturnSize + maxRetainedSize + child.calculateMaxPeekMemory();
   }
 
   @Override
   public long calculateMaxReturnSize() {
-    return child.calculateMaxReturnSize();
+    return maxReturnSize;
   }
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return child.calculateRetainedSizeAfterCallingNext();
+    return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
   }
 
   public static class InsertTabletStatementGenerator {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 68329d56e1..ebe8bbe64d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -20,12 +20,12 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.IntoProcessException;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -37,6 +37,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 public class DeviceViewIntoOperator extends AbstractIntoOperator {
@@ -60,8 +61,16 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
       Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap,
       Map<String, Boolean> targetDeviceToAlignedMap,
       Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
-      Map<String, InputLocation> sourceColumnToInputLocationMap) {
-    super(operatorContext, child, null, sourceColumnToInputLocationMap);
+      Map<String, InputLocation> sourceColumnToInputLocationMap,
+      ExecutorService intoOperationExecutor,
+      long maxStatementSize) {
+    super(
+        operatorContext,
+        child,
+        null,
+        sourceColumnToInputLocationMap,
+        intoOperationExecutor,
+        maxStatementSize);
     this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
     this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
     this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
@@ -75,37 +84,67 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
   }
 
   @Override
-  public TsBlock next() throws IntoProcessException {
-    TsBlock inputTsBlock = child.next();
-    if (inputTsBlock != null) {
-      String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
-      if (!Objects.equals(device, currentDevice)) {
-        insertMultiTabletsInternally(false);
-        updateResultTsBlock();
-
-        insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device);
-        currentDevice = device;
+  protected boolean processTsBlock(TsBlock inputTsBlock) {
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return true;
+    }
+
+    String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
+    if (!Objects.equals(device, currentDevice)) {
+      InsertMultiTabletsStatement insertMultiTabletsStatement =
+          constructInsertMultiTabletsStatement(false);
+      updateResultTsBlock();
+
+      insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device);
+      currentDevice = device;
+
+      if (insertMultiTabletsStatement != null) {
+        executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
+        cachedTsBlock = inputTsBlock;
+        return false;
       }
-      int readIndex = 0;
-      while (readIndex < inputTsBlock.getPositionCount()) {
-        int lastReadIndex = readIndex;
-        for (IntoOperator.InsertTabletStatementGenerator generator :
-            insertTabletStatementGenerators) {
-          lastReadIndex =
-              Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
-        }
-        readIndex = lastReadIndex;
-        insertMultiTabletsInternally(true);
+    }
+
+    int readIndex = 0;
+    while (readIndex < inputTsBlock.getPositionCount()) {
+      int lastReadIndex = readIndex;
+      for (IntoOperator.InsertTabletStatementGenerator generator :
+          insertTabletStatementGenerators) {
+        lastReadIndex = Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
+      }
+      readIndex = lastReadIndex;
+      if (insertMultiTabletsInternally(true)) {
+        cachedTsBlock = inputTsBlock.subTsBlock(readIndex);
+        return false;
       }
     }
+    return true;
+  }
 
-    if (child.hasNext()) {
+  @Override
+  protected TsBlock tryToReturnResultTsBlock() {
+    InsertMultiTabletsStatement insertMultiTabletsStatement =
+        constructInsertMultiTabletsStatement(false);
+    updateResultTsBlock();
+    currentDevice = null;
+
+    if (insertMultiTabletsStatement != null) {
+      executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
       return null;
-    } else {
-      insertMultiTabletsInternally(false);
-      updateResultTsBlock();
-      return resultTsBlockBuilder.build();
     }
+
+    finished = true;
+    return resultTsBlockBuilder.build();
+  }
+
+  private List<IntoOperator.InsertTabletStatementGenerator>
+      constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
+    Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+        deviceToTargetPathSourceInputLocationMap.get(currentDevice);
+    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
+        deviceToTargetPathDataTypeMap.get(currentDevice);
+    return constructInsertTabletStatementGenerators(
+        targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
   }
 
   private void updateResultTsBlock() {
@@ -127,14 +166,4 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
       resultTsBlockBuilder.declarePosition();
     }
   }
-
-  private List<IntoOperator.InsertTabletStatementGenerator>
-      constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
-    Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
-        deviceToTargetPathSourceInputLocationMap.get(currentDevice);
-    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
-        deviceToTargetPathDataTypeMap.get(currentDevice);
-    return constructInsertTabletStatementGenerators(
-        targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 4a34488779..28488d5ecb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 public class IntoOperator extends AbstractIntoOperator {
@@ -48,38 +49,49 @@ public class IntoOperator extends AbstractIntoOperator {
       Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
       Map<String, Boolean> targetDeviceToAlignedMap,
       List<Pair<String, PartialPath>> sourceTargetPathPairList,
-      Map<String, InputLocation> sourceColumnToInputLocationMap) {
+      Map<String, InputLocation> sourceColumnToInputLocationMap,
+      ExecutorService intoOperationExecutor,
+      long maxStatementSize) {
     super(
         operatorContext,
         child,
         constructInsertTabletStatementGenerators(
             targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap),
-        sourceColumnToInputLocationMap);
+        sourceColumnToInputLocationMap,
+        intoOperationExecutor,
+        maxStatementSize);
     this.sourceTargetPathPairList = sourceTargetPathPairList;
   }
 
   @Override
-  public TsBlock next() {
-    TsBlock inputTsBlock = child.next();
-    if (inputTsBlock != null) {
-      int readIndex = 0;
-      while (readIndex < inputTsBlock.getPositionCount()) {
-        int lastReadIndex = readIndex;
-        for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-          lastReadIndex =
-              Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
-        }
-        readIndex = lastReadIndex;
-        insertMultiTabletsInternally(true);
+  protected boolean processTsBlock(TsBlock inputTsBlock) {
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return true;
+    }
+
+    int readIndex = 0;
+    while (readIndex < inputTsBlock.getPositionCount()) {
+      int lastReadIndex = readIndex;
+      for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+        lastReadIndex = Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
+      }
+      readIndex = lastReadIndex;
+      if (insertMultiTabletsInternally(true)) {
+        cachedTsBlock = inputTsBlock.subTsBlock(readIndex);
+        return false;
       }
     }
+    return true;
+  }
 
-    if (child.hasNext()) {
+  @Override
+  protected TsBlock tryToReturnResultTsBlock() {
+    if (insertMultiTabletsInternally(false)) {
       return null;
-    } else {
-      insertMultiTabletsInternally(false);
-      return constructResultTsBlock();
     }
+
+    finished = true;
+    return constructResultTsBlock();
   }
 
   private TsBlock constructResultTsBlock() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 4a3d844b73..e2497d44c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -35,6 +35,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
@@ -169,4 +170,8 @@ public class LocalExecutionPlanContext {
   public long getDataRegionTTL() {
     return dataRegionTTL;
   }
+
+  public ExecutorService getIntoOperationExecutor() {
+    return instanceContext.getIntoOperationExecutor();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index aea6df9299..e0438e2668 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
@@ -172,6 +173,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
+import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
 import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
@@ -1377,7 +1379,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         sourceColumnToInputLocationMap,
         context.getTypeProvider());
 
+    int rowLimit =
+        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+    long maxStatementSize = calculateStatementSizePerLine(targetPathToDataTypeMap) * rowLimit;
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+
     return new IntoOperator(
         operatorContext,
         child,
@@ -1385,7 +1392,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         targetPathToDataTypeMap,
         intoPathDescriptor.getTargetDeviceToAlignedMap(),
         intoPathDescriptor.getSourceTargetPathPairList(),
-        sourceColumnToInputLocationMap);
+        sourceColumnToInputLocationMap,
+        context.getIntoOperationExecutor(),
+        maxStatementSize);
   }
 
   @Override
@@ -1410,6 +1419,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         new HashMap<>();
     Map<String, Map<PartialPath, Map<String, String>>> sourceDeviceToTargetPathMap =
         deviceViewIntoPathDescriptor.getSourceDeviceToTargetPathMap();
+    long statementSizePerLine = 0L;
     for (Map.Entry<String, Map<PartialPath, Map<String, String>>> deviceEntry :
         sourceDeviceToTargetPathMap.entrySet()) {
       String sourceDevice = deviceEntry.getKey();
@@ -1425,8 +1435,13 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
       deviceToTargetPathSourceInputLocationMap.put(
           sourceDevice, targetPathToSourceInputLocationMap);
       deviceToTargetPathDataTypeMap.put(sourceDevice, targetPathToDataTypeMap);
+      statementSizePerLine += calculateStatementSizePerLine(targetPathToDataTypeMap);
     }
 
+    int rowLimit =
+        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+    long maxStatementSize = statementSizePerLine * rowLimit;
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new DeviceViewIntoOperator(
         operatorContext,
@@ -1435,7 +1450,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         deviceToTargetPathDataTypeMap,
         deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(),
         deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
-        sourceColumnToInputLocationMap);
+        sourceColumnToInputLocationMap,
+        context.getIntoOperationExecutor(),
+        maxStatementSize);
   }
 
   private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {
@@ -1469,6 +1486,38 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     }
   }
 
+  private long calculateStatementSizePerLine(
+      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap) {
+    long maxStatementSize = Long.BYTES;
+    List<TSDataType> dataTypes =
+        targetPathToDataTypeMap.values().stream()
+            .flatMap(stringTSDataTypeMap -> stringTSDataTypeMap.values().stream())
+            .collect(Collectors.toList());
+    for (TSDataType dataType : dataTypes) {
+      maxStatementSize += getValueSizePerLine(dataType);
+    }
+    return maxStatementSize;
+  }
+
+  private static long getValueSizePerLine(TSDataType tsDataType) {
+    switch (tsDataType) {
+      case INT32:
+        return Integer.BYTES;
+      case INT64:
+        return Long.BYTES;
+      case FLOAT:
+        return Float.BYTES;
+      case DOUBLE:
+        return Double.BYTES;
+      case BOOLEAN:
+        return Byte.BYTES;
+      case TEXT:
+        return StatisticsManager.getInstance().getMaxBinarySizeInBytes(new PartialPath());
+      default:
+        throw new UnsupportedOperationException("Unknown data type " + tsDataType);
+    }
+  }
+
   @Override
   public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
     List<Operator> children =