You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/05 09:53:45 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0] [IOTDB-4978] Fix deadLock caused by blocked operation in IntoOperator
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new 478b2120e4 [To rel/1.0] [IOTDB-4978] Fix deadLock caused by blocked operation in IntoOperator
478b2120e4 is described below
commit 478b2120e4e1eb84ddbebbb706c0be6f39c8710d
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Mon Dec 5 17:53:38 2022 +0800
[To rel/1.0] [IOTDB-4978] Fix deadLock caused by blocked operation in IntoOperator
---
docs/UserGuide/Reference/Common-Config-Manual.md | 9 +
.../zh/UserGuide/Reference/Common-Config-Manual.md | 9 +
.../java/org/apache/iotdb/it/env/MppConfig.java | 9 +
.../org/apache/iotdb/itbase/env/BaseConfig.java | 8 +
.../iotdb/db/it/env/StandaloneEnvConfig.java | 14 ++
.../iotdb/db/it/selectinto/IoTDBSelectInto2IT.java | 3 +
.../iotdb/db/it/selectinto/IoTDBSelectInto3IT.java | 3 +
.../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java | 5 +
.../resources/conf/iotdb-common.properties | 4 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 +
.../fragment/FragmentInstanceContext.java | 22 ++-
.../fragment/FragmentInstanceManager.java | 17 +-
.../operator/process/AbstractIntoOperator.java | 215 ++++++++++++++++-----
.../operator/process/DeviceViewIntoOperator.java | 105 ++++++----
.../execution/operator/process/IntoOperator.java | 48 +++--
.../plan/planner/LocalExecutionPlanContext.java | 5 +
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 53 ++++-
18 files changed, 436 insertions(+), 114 deletions(-)
diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md
index 72d59afcf4..e2c79e093e 100644
--- a/docs/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/UserGuide/Reference/Common-Config-Manual.md
@@ -1384,6 +1384,15 @@ Different configuration parameters take effect in the following three ways:
| Default | 10000 |
| Effective | hot-load |
+* into\_operation\_execution\_thread\_count
+
+| Name | into\_operation\_execution\_thread\_count |
+| :---------: | :------------------------------------------------------------ |
+| Description | The number of threads in the thread pool that execute insert-tablet tasks |
+| Type | int32 |
+| Default | 2 |
+| Effective | After restarting system |
+
### Continuous Query
* continuous\_query\_execution\_thread
diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
index a78270d460..0c6c28fcfb 100644
--- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
@@ -1428,6 +1428,15 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。
| 默认值 | 10000 |
| 改后生效方式 | 热加载 |
+* into\_operation\_execution\_thread\_count
+
+| 名字 | into\_operation\_execution\_thread\_count |
+| :---------: | :---------------------------------------- |
+| 描述 | SELECT INTO 中执行写入任务的线程池的线程数 |
+| 类型 | int32 |
+| 默认值 | 2 |
+| 改后生效方式 | 重启服务生效 |
+
### 连续查询配置
* continuous\_query\_submit\_thread\_count
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index a50d828dfb..df5535bdf8 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -379,4 +379,13 @@ public class MppConfig implements BaseConfig {
"least_data_region_group_num", String.valueOf(leastDataRegionGroupNum));
return this;
}
+
+ @Override
+ public BaseConfig setQueryThreadCount(int queryThreadCount) {
+ if (queryThreadCount <= 0) {
+ queryThreadCount = Runtime.getRuntime().availableProcessors();
+ }
+ confignodeProperties.setProperty("query_thread_count", String.valueOf(queryThreadCount));
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 741e2f2d5c..31def0bc8b 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -404,4 +404,12 @@ public interface BaseConfig {
default int getLeastDataRegionGroupNum() {
return 5;
}
+
+ default BaseConfig setQueryThreadCount(int queryThreadCount) {
+ return this;
+ }
+
+ default int getQueryThreadCount() {
+ return Runtime.getRuntime().availableProcessors();
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
index 32291cb01c..bb6aff4fb0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
@@ -337,4 +337,18 @@ public class StandaloneEnvConfig implements BaseConfig {
public int getSelectIntoInsertTabletPlanRowLimit() {
return IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
}
+
+ @Override
+ public BaseConfig setQueryThreadCount(int queryThreadCount) {
+ if (queryThreadCount <= 0) {
+ queryThreadCount = Runtime.getRuntime().availableProcessors();
+ }
+ IoTDBDescriptor.getInstance().getConfig().setQueryThreadCount(queryThreadCount);
+ return this;
+ }
+
+ @Override
+ public int getQueryThreadCount() {
+ return IoTDBDescriptor.getInstance().getConfig().getQueryThreadCount();
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
index 33db4ce761..5a11ae1f8f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
@@ -41,8 +41,10 @@ public class IoTDBSelectInto2IT extends IoTDBSelectIntoIT {
selectIntoInsertTabletPlanRowLimit =
ConfigFactory.getConfig().getSelectIntoInsertTabletPlanRowLimit();
numOfPointsPerPage = ConfigFactory.getConfig().getMaxNumberOfPointsInPage();
+ queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
ConfigFactory.getConfig().setSelectIntoInsertTabletPlanRowLimit(8);
ConfigFactory.getConfig().setMaxNumberOfPointsInPage(5);
+ ConfigFactory.getConfig().setQueryThreadCount(1);
EnvFactory.getEnv().initBeforeClass();
prepareData(SQLs);
}
@@ -53,5 +55,6 @@ public class IoTDBSelectInto2IT extends IoTDBSelectIntoIT {
ConfigFactory.getConfig()
.setSelectIntoInsertTabletPlanRowLimit(selectIntoInsertTabletPlanRowLimit);
ConfigFactory.getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+ ConfigFactory.getConfig().setQueryThreadCount(queryThreadCount);
}
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
index 12b8425f9d..f37e12b588 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
@@ -40,7 +40,9 @@ public class IoTDBSelectInto3IT extends IoTDBSelectIntoIT {
public static void setUp() throws Exception {
selectIntoInsertTabletPlanRowLimit =
ConfigFactory.getConfig().getSelectIntoInsertTabletPlanRowLimit();
+ queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
ConfigFactory.getConfig().setSelectIntoInsertTabletPlanRowLimit(5);
+ ConfigFactory.getConfig().setQueryThreadCount(1);
EnvFactory.getEnv().initBeforeClass();
prepareData(SQLs);
}
@@ -50,5 +52,6 @@ public class IoTDBSelectInto3IT extends IoTDBSelectIntoIT {
EnvFactory.getEnv().cleanAfterClass();
ConfigFactory.getConfig()
.setSelectIntoInsertTabletPlanRowLimit(selectIntoInsertTabletPlanRowLimit);
+ ConfigFactory.getConfig().setQueryThreadCount(queryThreadCount);
}
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index 6236fcd7cf..5102d43e63 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.it.selectinto;
+import org.apache.iotdb.it.env.ConfigFactory;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
@@ -47,6 +48,7 @@ public class IoTDBSelectIntoIT {
protected static int selectIntoInsertTabletPlanRowLimit;
protected static int numOfPointsPerPage;
+ protected static int queryThreadCount;
protected static final String[] SQLs =
new String[] {
@@ -106,6 +108,8 @@ public class IoTDBSelectIntoIT {
@BeforeClass
public static void setUp() throws Exception {
+ queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
+ ConfigFactory.getConfig().setQueryThreadCount(1);
EnvFactory.getEnv().initBeforeClass();
prepareData(SQLs);
}
@@ -113,6 +117,7 @@ public class IoTDBSelectIntoIT {
@AfterClass
public static void tearDown() throws Exception {
EnvFactory.getEnv().cleanAfterClass();
+ ConfigFactory.getConfig().setQueryThreadCount(queryThreadCount);
}
// -------------------------------------- ALIGN BY TIME ---------------------------------------
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index e29b1252b2..d932a8e9f3 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -877,6 +877,10 @@
# Datatype: int
# select_into_insert_tablet_plan_row_limit=10000
+# The number of threads in the thread pool that execute insert-tablet tasks
+# Datatype: int
+# into_operation_execution_thread_count=2
+
####################
### Continuous Query Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0f923aef2e..99dcf1b865 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -317,7 +317,7 @@ public class IoTDBConfig {
private int flushThreadCount = Runtime.getRuntime().availableProcessors();
/** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */
- private int queryThreadCount = Math.max(4, Runtime.getRuntime().availableProcessors());
+ private int queryThreadCount = Runtime.getRuntime().availableProcessors();
/** How many queries can be concurrently executed. When <= 0, use 1000. */
private int maxAllowedConcurrentQueries = 1000;
@@ -685,6 +685,9 @@ public class IoTDBConfig {
*/
private int selectIntoInsertTabletPlanRowLimit = 10000;
+ /** The number of threads in the thread pool that execute insert-tablet tasks. */
+ private int intoOperationExecutionThreadCount = 2;
+
/** Default TSfile storage is in local file system */
private FSType tsFileStorageFs = FSType.LOCAL;
@@ -1902,6 +1905,14 @@ public class IoTDBConfig {
return selectIntoInsertTabletPlanRowLimit;
}
+ public int getIntoOperationExecutionThreadCount() {
+ return intoOperationExecutionThreadCount;
+ }
+
+ public void setIntoOperationExecutionThreadCount(int intoOperationExecutionThreadCount) {
+ this.intoOperationExecutionThreadCount = intoOperationExecutionThreadCount;
+ }
+
public int getCompactionWriteThroughputMbPerSec() {
return compactionWriteThroughputMbPerSec;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index bba0dd92c5..7a25d6d21f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -903,6 +903,14 @@ public class IoTDBDescriptor {
properties.getProperty(
"select_into_insert_tablet_plan_row_limit",
String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
+ conf.setIntoOperationExecutionThreadCount(
+ Integer.parseInt(
+ properties.getProperty(
+ "into_operation_execution_thread_count",
+ String.valueOf(conf.getIntoOperationExecutionThreadCount()))));
+ if (conf.getIntoOperationExecutionThreadCount() <= 0) {
+ conf.setIntoOperationExecutionThreadCount(2);
+ }
conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 69928dcec3..5a4ede9538 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -33,6 +33,7 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -65,6 +66,8 @@ public class FragmentInstanceContext extends QueryContext {
// session info
private SessionInfo sessionInfo;
+ private ExecutorService intoOperationExecutor;
+
// private final GcMonitor gcMonitor;
// private final AtomicLong startNanos = new AtomicLong();
// private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -74,9 +77,12 @@ public class FragmentInstanceContext extends QueryContext {
// private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
public static FragmentInstanceContext createFragmentInstanceContext(
- FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+ FragmentInstanceId id,
+ FragmentInstanceStateMachine stateMachine,
+ SessionInfo sessionInfo,
+ ExecutorService intoOperationExecutor) {
FragmentInstanceContext instanceContext =
- new FragmentInstanceContext(id, stateMachine, sessionInfo);
+ new FragmentInstanceContext(id, stateMachine, sessionInfo, intoOperationExecutor);
instanceContext.initialize();
instanceContext.start();
return instanceContext;
@@ -87,11 +93,15 @@ public class FragmentInstanceContext extends QueryContext {
}
private FragmentInstanceContext(
- FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+ FragmentInstanceId id,
+ FragmentInstanceStateMachine stateMachine,
+ SessionInfo sessionInfo,
+ ExecutorService intoOperationExecutor) {
this.id = id;
this.stateMachine = stateMachine;
this.executionEndTime.set(END_TIME_INITIAL_VALUE);
this.sessionInfo = sessionInfo;
+ this.intoOperationExecutor = intoOperationExecutor;
}
@TestOnly
@@ -99,7 +109,7 @@ public class FragmentInstanceContext extends QueryContext {
FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
FragmentInstanceContext instanceContext =
new FragmentInstanceContext(
- id, stateMachine, new SessionInfo(1, "test", ZoneId.systemDefault().getId()));
+ id, stateMachine, new SessionInfo(1, "test", ZoneId.systemDefault().getId()), null);
instanceContext.initialize();
instanceContext.start();
return instanceContext;
@@ -238,4 +248,8 @@ public class FragmentInstanceContext extends QueryContext {
public Optional<Throwable> getFailureCause() {
return Optional.ofNullable(stateMachine.getFailureCauses().peek());
}
+
+ public ExecutorService getIntoOperationExecutor() {
+ return intoOperationExecutor;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index db653ff690..f3da54b6c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -68,6 +68,8 @@ public class FragmentInstanceManager {
private static final long QUERY_TIMEOUT_MS =
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
+ private final ExecutorService intoOperationExecutor;
+
public static FragmentInstanceManager getInstance() {
return FragmentInstanceManager.InstanceHolder.INSTANCE;
}
@@ -90,6 +92,11 @@ public class FragmentInstanceManager {
200,
200,
TimeUnit.MILLISECONDS);
+
+ this.intoOperationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ IoTDBDescriptor.getInstance().getConfig().getIntoOperationExecutionThreadCount(),
+ "into-operation-executor");
}
public FragmentInstanceInfo execDataQueryFragmentInstance(
@@ -109,7 +116,10 @@ public class FragmentInstanceManager {
instanceId,
fragmentInstanceId ->
createFragmentInstanceContext(
- fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+ fragmentInstanceId,
+ stateMachine,
+ instance.getSessionInfo(),
+ intoOperationExecutor));
try {
DataDriver driver =
@@ -165,7 +175,10 @@ public class FragmentInstanceManager {
instanceId,
fragmentInstanceId ->
createFragmentInstanceContext(
- fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+ fragmentInstanceId,
+ stateMachine,
+ instance.getSessionInfo(),
+ intoOperationExecutor));
try {
SchemaDriver driver =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index f331b2ed87..5f4b8dda34 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -37,7 +37,10 @@ import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -45,30 +48,162 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
public abstract class AbstractIntoOperator implements ProcessOperator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIntoOperator.class);
+
protected final OperatorContext operatorContext;
protected final Operator child;
+ protected TsBlock cachedTsBlock;
+
protected List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
private DataNodeInternalClient client;
+ private final ExecutorService writeOperationExecutor;
+ private ListenableFuture<TSStatus> writeOperationFuture;
+
+ protected boolean finished = false;
+
+ private final long maxRetainedSize;
+ private final long maxReturnSize;
+
public AbstractIntoOperator(
OperatorContext operatorContext,
Operator child,
List<InsertTabletStatementGenerator> insertTabletStatementGenerators,
- Map<String, InputLocation> sourceColumnToInputLocationMap) {
+ Map<String, InputLocation> sourceColumnToInputLocationMap,
+ ExecutorService intoOperationExecutor,
+ long maxStatementSize) {
this.operatorContext = operatorContext;
this.child = child;
this.insertTabletStatementGenerators = insertTabletStatementGenerators;
this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
+ this.writeOperationExecutor = intoOperationExecutor;
+
+ this.maxRetainedSize = child.calculateMaxReturnSize() + maxStatementSize;
+ this.maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ ListenableFuture<?> childBlocked = child.isBlocked();
+ boolean writeDone = writeOperationDone();
+ if (writeDone && childBlocked.isDone()) {
+ return NOT_BLOCKED;
+ } else if (childBlocked.isDone()) {
+ return writeOperationFuture;
+ } else if (writeDone) {
+ return childBlocked;
+ } else {
+ return successfulAsList(Arrays.asList(writeOperationFuture, childBlocked));
+ }
+ }
+
+ private boolean writeOperationDone() {
+ if (writeOperationFuture == null) {
+ return true;
+ }
+
+ return writeOperationFuture.isDone();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !finished;
+ }
+
+ @Override
+ public TsBlock next() {
+ if (!checkLastWriteOperation()) {
+ return null;
+ }
+
+ if (!processTsBlock(cachedTsBlock)) {
+ return null;
+ }
+ cachedTsBlock = null;
+
+ if (child.hasNext()) {
+ TsBlock inputTsBlock = child.next();
+ processTsBlock(inputTsBlock);
+
+ // call child.next only once
+ return null;
+ } else {
+ return tryToReturnResultTsBlock();
+ }
+ }
+
+ /**
+ * Check whether the last write operation was executed successfully, and throw an exception if the
+ * execution failed, otherwise continue to execute the operator.
+ *
+ * @return true if the last write operation has been executed successfully.
+ */
+ private boolean checkLastWriteOperation() {
+ if (writeOperationFuture == null) {
+ return true;
+ }
+
+ try {
+ if (!writeOperationFuture.isDone()) {
+ throw new IllegalStateException(
+ "The operator cannot continue until the last write operation is done.");
+ }
+
+ TSStatus executionStatus = writeOperationFuture.get();
+ if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ String message =
+ String.format(
+ "Error occurred while inserting tablets in SELECT INTO: %s",
+ executionStatus.getMessage());
+ throw new IntoProcessException(message);
+ }
+
+ for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+ generator.reset();
+ }
+
+ writeOperationFuture = null;
+ return true;
+ } catch (InterruptedException e) {
+ LOGGER.warn(
+ "{}: interrupted when processing write operation future with exception {}", this, e);
+ Thread.currentThread().interrupt();
+ throw new IntoProcessException(e.getMessage());
+ } catch (ExecutionException e) {
+ throw new IntoProcessException(e.getMessage());
+ }
}
+ /**
+ * Write the data of the input TsBlock into Statement.
+ *
+ * <p>If the Statement is full, submit one write task and return false.
+ *
+ * <p>If TsBlock is empty, or all data has been written to Statement, return true.
+ */
+ protected abstract boolean processTsBlock(TsBlock inputTsBlock);
+
+ protected abstract TsBlock tryToReturnResultTsBlock();
+
protected static List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
@@ -87,10 +222,22 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
return insertTabletStatementGenerators;
}
- protected void insertMultiTabletsInternally(boolean needCheck) {
+ /** Return true if write task is submitted successfully. */
+ protected boolean insertMultiTabletsInternally(boolean needCheck) {
+ InsertMultiTabletsStatement insertMultiTabletsStatement =
+ constructInsertMultiTabletsStatement(needCheck);
+ if (insertMultiTabletsStatement == null) {
+ return false;
+ }
+
+ executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
+ return true;
+ }
+
+ protected InsertMultiTabletsStatement constructInsertMultiTabletsStatement(boolean needCheck) {
if (insertTabletStatementGenerators == null
|| (needCheck && !existFullStatement(insertTabletStatementGenerators))) {
- return;
+ return null;
}
List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
@@ -100,28 +247,23 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
}
}
if (insertTabletStatementList.isEmpty()) {
- return;
+ return null;
}
InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+ return insertMultiTabletsStatement;
+ }
+ protected void executeInsertMultiTabletsStatement(
+ InsertMultiTabletsStatement insertMultiTabletsStatement) {
if (client == null) {
client = new DataNodeInternalClient(operatorContext.getSessionInfo());
}
- TSStatus executionStatus = client.insertTablets(insertMultiTabletsStatement);
- if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- String message =
- String.format(
- "Error occurred while inserting tablets in SELECT INTO: %s",
- executionStatus.getMessage());
- throw new IntoProcessException(message);
- }
- for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
- generator.reset();
- }
+ writeOperationFuture =
+ Futures.submit(
+ () -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor);
}
private boolean existFullStatement(
@@ -134,19 +276,6 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
return false;
}
- private boolean existNonEmptyStatement(
- List<InsertTabletStatementGenerator> insertTabletStatementGenerators) {
- if (insertTabletStatementGenerators == null) {
- return false;
- }
- for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
- if (generator != null && !generator.isEmpty()) {
- return true;
- }
- }
- return false;
- }
-
protected int findWritten(String device, String measurement) {
for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
if (!Objects.equals(generator.getDevice(), device)) {
@@ -158,18 +287,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
}
@Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- @Override
- public ListenableFuture<?> isBlocked() {
- return child.isBlocked();
- }
-
- @Override
- public boolean hasNext() {
- return existNonEmptyStatement(insertTabletStatementGenerators) || child.hasNext();
+ public boolean isFinished() {
+ return finished;
}
@Override
@@ -177,27 +296,25 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
if (client != null) {
client.close();
}
+ if (writeOperationFuture != null) {
+ writeOperationFuture.cancel(true);
+ }
child.close();
}
- @Override
- public boolean isFinished() {
- return !hasNext();
- }
-
@Override
public long calculateMaxPeekMemory() {
- return child.calculateMaxPeekMemory();
+ return maxReturnSize + maxRetainedSize + child.calculateMaxPeekMemory();
}
@Override
public long calculateMaxReturnSize() {
- return child.calculateMaxReturnSize();
+ return maxReturnSize;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
- return child.calculateRetainedSizeAfterCallingNext();
+ return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
}
public static class InsertTabletStatementGenerator {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 68329d56e1..ebe8bbe64d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -20,12 +20,12 @@
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.IntoProcessException;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -37,6 +37,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
public class DeviceViewIntoOperator extends AbstractIntoOperator {
@@ -60,8 +61,16 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap,
Map<String, Boolean> targetDeviceToAlignedMap,
Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
- Map<String, InputLocation> sourceColumnToInputLocationMap) {
- super(operatorContext, child, null, sourceColumnToInputLocationMap);
+ Map<String, InputLocation> sourceColumnToInputLocationMap,
+ ExecutorService intoOperationExecutor,
+ long maxStatementSize) {
+ super(
+ operatorContext,
+ child,
+ null,
+ sourceColumnToInputLocationMap,
+ intoOperationExecutor,
+ maxStatementSize);
this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
@@ -75,37 +84,67 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
}
@Override
- public TsBlock next() throws IntoProcessException {
- TsBlock inputTsBlock = child.next();
- if (inputTsBlock != null) {
- String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
- if (!Objects.equals(device, currentDevice)) {
- insertMultiTabletsInternally(false);
- updateResultTsBlock();
-
- insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device);
- currentDevice = device;
+ protected boolean processTsBlock(TsBlock inputTsBlock) {
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return true;
+ }
+
+ String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
+ if (!Objects.equals(device, currentDevice)) {
+ InsertMultiTabletsStatement insertMultiTabletsStatement =
+ constructInsertMultiTabletsStatement(false);
+ updateResultTsBlock();
+
+ insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device);
+ currentDevice = device;
+
+ if (insertMultiTabletsStatement != null) {
+ executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
+ cachedTsBlock = inputTsBlock;
+ return false;
}
- int readIndex = 0;
- while (readIndex < inputTsBlock.getPositionCount()) {
- int lastReadIndex = readIndex;
- for (IntoOperator.InsertTabletStatementGenerator generator :
- insertTabletStatementGenerators) {
- lastReadIndex =
- Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
- }
- readIndex = lastReadIndex;
- insertMultiTabletsInternally(true);
+ }
+
+ int readIndex = 0;
+ while (readIndex < inputTsBlock.getPositionCount()) {
+ int lastReadIndex = readIndex;
+ for (IntoOperator.InsertTabletStatementGenerator generator :
+ insertTabletStatementGenerators) {
+ lastReadIndex = Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
+ }
+ readIndex = lastReadIndex;
+ if (insertMultiTabletsInternally(true)) {
+ cachedTsBlock = inputTsBlock.subTsBlock(readIndex);
+ return false;
}
}
+ return true;
+ }
- if (child.hasNext()) {
+ @Override
+ protected TsBlock tryToReturnResultTsBlock() {
+ InsertMultiTabletsStatement insertMultiTabletsStatement =
+ constructInsertMultiTabletsStatement(false);
+ updateResultTsBlock();
+ currentDevice = null;
+
+ if (insertMultiTabletsStatement != null) {
+ executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
return null;
- } else {
- insertMultiTabletsInternally(false);
- updateResultTsBlock();
- return resultTsBlockBuilder.build();
}
+
+ finished = true;
+ return resultTsBlockBuilder.build();
+ }
+
+ private List<IntoOperator.InsertTabletStatementGenerator>
+ constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
+ Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+ deviceToTargetPathSourceInputLocationMap.get(currentDevice);
+ Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
+ deviceToTargetPathDataTypeMap.get(currentDevice);
+ return constructInsertTabletStatementGenerators(
+ targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
}
private void updateResultTsBlock() {
@@ -127,14 +166,4 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
resultTsBlockBuilder.declarePosition();
}
}
-
- private List<IntoOperator.InsertTabletStatementGenerator>
- constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
- Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
- deviceToTargetPathSourceInputLocationMap.get(currentDevice);
- Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
- deviceToTargetPathDataTypeMap.get(currentDevice);
- return constructInsertTabletStatementGenerators(
- targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 4a34488779..28488d5ecb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
public class IntoOperator extends AbstractIntoOperator {
@@ -48,38 +49,49 @@ public class IntoOperator extends AbstractIntoOperator {
Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
Map<String, Boolean> targetDeviceToAlignedMap,
List<Pair<String, PartialPath>> sourceTargetPathPairList,
- Map<String, InputLocation> sourceColumnToInputLocationMap) {
+ Map<String, InputLocation> sourceColumnToInputLocationMap,
+ ExecutorService intoOperationExecutor,
+ long maxStatementSize) {
super(
operatorContext,
child,
constructInsertTabletStatementGenerators(
targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap),
- sourceColumnToInputLocationMap);
+ sourceColumnToInputLocationMap,
+ intoOperationExecutor,
+ maxStatementSize);
this.sourceTargetPathPairList = sourceTargetPathPairList;
}
@Override
- public TsBlock next() {
- TsBlock inputTsBlock = child.next();
- if (inputTsBlock != null) {
- int readIndex = 0;
- while (readIndex < inputTsBlock.getPositionCount()) {
- int lastReadIndex = readIndex;
- for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
- lastReadIndex =
- Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
- }
- readIndex = lastReadIndex;
- insertMultiTabletsInternally(true);
+ protected boolean processTsBlock(TsBlock inputTsBlock) {
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return true;
+ }
+
+ int readIndex = 0;
+ while (readIndex < inputTsBlock.getPositionCount()) {
+ int lastReadIndex = readIndex;
+ for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+ lastReadIndex = Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
+ }
+ readIndex = lastReadIndex;
+ if (insertMultiTabletsInternally(true)) {
+ cachedTsBlock = inputTsBlock.subTsBlock(readIndex);
+ return false;
}
}
+ return true;
+ }
- if (child.hasNext()) {
+ @Override
+ protected TsBlock tryToReturnResultTsBlock() {
+ if (insertMultiTabletsInternally(false)) {
return null;
- } else {
- insertMultiTabletsInternally(false);
- return constructResultTsBlock();
}
+
+ finished = true;
+ return constructResultTsBlock();
}
private TsBlock constructResultTsBlock() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 4a3d844b73..e2497d44c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -35,6 +35,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
@@ -169,4 +170,8 @@ public class LocalExecutionPlanContext {
public long getDataRegionTTL() {
return dataRegionTTL;
}
+
+ public ExecutorService getIntoOperationExecutor() {
+ return instanceContext.getIntoOperationExecutor();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index aea6df9299..e0438e2668 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
@@ -172,6 +173,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
+import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
@@ -1377,7 +1379,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
sourceColumnToInputLocationMap,
context.getTypeProvider());
+ int rowLimit =
+ IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+ long maxStatementSize = calculateStatementSizePerLine(targetPathToDataTypeMap) * rowLimit;
+
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+
return new IntoOperator(
operatorContext,
child,
@@ -1385,7 +1392,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
targetPathToDataTypeMap,
intoPathDescriptor.getTargetDeviceToAlignedMap(),
intoPathDescriptor.getSourceTargetPathPairList(),
- sourceColumnToInputLocationMap);
+ sourceColumnToInputLocationMap,
+ context.getIntoOperationExecutor(),
+ maxStatementSize);
}
@Override
@@ -1410,6 +1419,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
new HashMap<>();
Map<String, Map<PartialPath, Map<String, String>>> sourceDeviceToTargetPathMap =
deviceViewIntoPathDescriptor.getSourceDeviceToTargetPathMap();
+ long statementSizePerLine = 0L;
for (Map.Entry<String, Map<PartialPath, Map<String, String>>> deviceEntry :
sourceDeviceToTargetPathMap.entrySet()) {
String sourceDevice = deviceEntry.getKey();
@@ -1425,8 +1435,13 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
deviceToTargetPathSourceInputLocationMap.put(
sourceDevice, targetPathToSourceInputLocationMap);
deviceToTargetPathDataTypeMap.put(sourceDevice, targetPathToDataTypeMap);
+ statementSizePerLine += calculateStatementSizePerLine(targetPathToDataTypeMap);
}
+ int rowLimit =
+ IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+ long maxStatementSize = statementSizePerLine * rowLimit;
+
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new DeviceViewIntoOperator(
operatorContext,
@@ -1435,7 +1450,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
deviceToTargetPathDataTypeMap,
deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(),
deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
- sourceColumnToInputLocationMap);
+ sourceColumnToInputLocationMap,
+ context.getIntoOperationExecutor(),
+ maxStatementSize);
}
private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {
@@ -1469,6 +1486,38 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
}
}
+ private long calculateStatementSizePerLine(
+ Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap) {
+ long maxStatementSize = Long.BYTES;
+ List<TSDataType> dataTypes =
+ targetPathToDataTypeMap.values().stream()
+ .flatMap(stringTSDataTypeMap -> stringTSDataTypeMap.values().stream())
+ .collect(Collectors.toList());
+ for (TSDataType dataType : dataTypes) {
+ maxStatementSize += getValueSizePerLine(dataType);
+ }
+ return maxStatementSize;
+ }
+
+ private static long getValueSizePerLine(TSDataType tsDataType) {
+ switch (tsDataType) {
+ case INT32:
+ return Integer.BYTES;
+ case INT64:
+ return Long.BYTES;
+ case FLOAT:
+ return Float.BYTES;
+ case DOUBLE:
+ return Double.BYTES;
+ case BOOLEAN:
+ return Byte.BYTES;
+ case TEXT:
+ return StatisticsManager.getInstance().getMaxBinarySizeInBytes(new PartialPath());
+ default:
+ throw new UnsupportedOperationException("Unknown data type " + tsDataType);
+ }
+ }
+
@Override
public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
List<Operator> children =