You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/30 11:43:54 UTC
[iotdb] 03/04: finish
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/FixIntoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3186ba814d70c886779b0bc6d5c20abc5160870a
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Nov 30 19:02:03 2022 +0800
finish
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 15 +++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 9 +++++
.../fragment/FragmentInstanceContext.java | 22 +++++++++--
.../fragment/FragmentInstanceManager.java | 16 +++++++-
.../operator/process/AbstractIntoOperator.java | 46 ++++++++++++++--------
.../operator/process/DeviceViewIntoOperator.java | 8 ++--
.../execution/operator/process/IntoOperator.java | 9 +++--
.../plan/planner/LocalExecutionPlanContext.java | 5 +++
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 6 ++-
9 files changed, 106 insertions(+), 30 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 133211462a..6e3d88e739 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -683,6 +683,13 @@ public class IoTDBConfig {
*/
private int selectIntoInsertTabletPlanRowLimit = 10000;
+ /**
+ * How many thread will be set up to execute into operation. When <= 0, use max(1, CPU core number
+ * / 2).
+ */
+ private int intoOperationSubmitThreadCount =
+ Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
+
/** Default TSfile storage is in local file system */
private FSType tsFileStorageFs = FSType.LOCAL;
@@ -1897,6 +1904,14 @@ public class IoTDBConfig {
return selectIntoInsertTabletPlanRowLimit;
}
+ public int getIntoOperationSubmitThreadCount() {
+ return intoOperationSubmitThreadCount;
+ }
+
+ public void setIntoOperationSubmitThreadCount(int intoOperationSubmitThreadCount) {
+ this.intoOperationSubmitThreadCount = intoOperationSubmitThreadCount;
+ }
+
public int getCompactionWriteThroughputMbPerSec() {
return compactionWriteThroughputMbPerSec;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index dd3bb85a91..58e9287abe 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -897,6 +897,15 @@ public class IoTDBDescriptor {
properties.getProperty(
"select_into_insert_tablet_plan_row_limit",
String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
+ conf.setIntoOperationSubmitThreadCount(
+ Integer.parseInt(
+ properties.getProperty(
+ "into_operation_submit_thread_count",
+ String.valueOf(conf.getIntoOperationSubmitThreadCount()))));
+ if (conf.getIntoOperationSubmitThreadCount() <= 0) {
+ conf.setIntoOperationSubmitThreadCount(
+ Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+ }
conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 69928dcec3..5a4ede9538 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -33,6 +33,7 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -65,6 +66,8 @@ public class FragmentInstanceContext extends QueryContext {
// session info
private SessionInfo sessionInfo;
+ private ExecutorService intoOperationExecutor;
+
// private final GcMonitor gcMonitor;
// private final AtomicLong startNanos = new AtomicLong();
// private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -74,9 +77,12 @@ public class FragmentInstanceContext extends QueryContext {
// private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
public static FragmentInstanceContext createFragmentInstanceContext(
- FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+ FragmentInstanceId id,
+ FragmentInstanceStateMachine stateMachine,
+ SessionInfo sessionInfo,
+ ExecutorService intoOperationExecutor) {
FragmentInstanceContext instanceContext =
- new FragmentInstanceContext(id, stateMachine, sessionInfo);
+ new FragmentInstanceContext(id, stateMachine, sessionInfo, intoOperationExecutor);
instanceContext.initialize();
instanceContext.start();
return instanceContext;
@@ -87,11 +93,15 @@ public class FragmentInstanceContext extends QueryContext {
}
private FragmentInstanceContext(
- FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+ FragmentInstanceId id,
+ FragmentInstanceStateMachine stateMachine,
+ SessionInfo sessionInfo,
+ ExecutorService intoOperationExecutor) {
this.id = id;
this.stateMachine = stateMachine;
this.executionEndTime.set(END_TIME_INITIAL_VALUE);
this.sessionInfo = sessionInfo;
+ this.intoOperationExecutor = intoOperationExecutor;
}
@TestOnly
@@ -99,7 +109,7 @@ public class FragmentInstanceContext extends QueryContext {
FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
FragmentInstanceContext instanceContext =
new FragmentInstanceContext(
- id, stateMachine, new SessionInfo(1, "test", ZoneId.systemDefault().getId()));
+ id, stateMachine, new SessionInfo(1, "test", ZoneId.systemDefault().getId()), null);
instanceContext.initialize();
instanceContext.start();
return instanceContext;
@@ -238,4 +248,8 @@ public class FragmentInstanceContext extends QueryContext {
public Optional<Throwable> getFailureCause() {
return Optional.ofNullable(stateMachine.getFailureCauses().peek());
}
+
+ public ExecutorService getIntoOperationExecutor() {
+ return intoOperationExecutor;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index fbee7f88d9..b6cbc8690b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -68,6 +68,8 @@ public class FragmentInstanceManager {
private static final long QUERY_TIMEOUT_MS =
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
+ private ExecutorService intoOperationExecutor;
+
public static FragmentInstanceManager getInstance() {
return FragmentInstanceManager.InstanceHolder.INSTANCE;
}
@@ -79,6 +81,10 @@ public class FragmentInstanceManager {
IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
this.instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
+ this.intoOperationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ IoTDBDescriptor.getInstance().getConfig().getIntoOperationSubmitThreadCount(),
+ "into-operation-executor");
this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
@@ -109,7 +115,10 @@ public class FragmentInstanceManager {
instanceId,
fragmentInstanceId ->
createFragmentInstanceContext(
- fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+ fragmentInstanceId,
+ stateMachine,
+ instance.getSessionInfo(),
+ intoOperationExecutor));
try {
DataDriver driver =
@@ -165,7 +174,10 @@ public class FragmentInstanceManager {
instanceId,
fragmentInstanceId ->
createFragmentInstanceContext(
- fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+ fragmentInstanceId,
+ stateMachine,
+ instance.getSessionInfo(),
+ intoOperationExecutor));
try {
SchemaDriver driver =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 6181dab228..7d41985ece 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -37,10 +37,8 @@ import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.util.concurrent.Futures.successfulAsList;
@@ -71,21 +69,20 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
private DataNodeInternalClient client;
- private ListenableFuture<?> isBlocked = NOT_BLOCKED;
-
- private final ListeningExecutorService writeOperationExecutor =
- MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+ private final ExecutorService writeOperationExecutor;
private ListenableFuture<TSStatus> writeOperationFuture;
public AbstractIntoOperator(
OperatorContext operatorContext,
Operator child,
List<InsertTabletStatementGenerator> insertTabletStatementGenerators,
- Map<String, InputLocation> sourceColumnToInputLocationMap) {
+ Map<String, InputLocation> sourceColumnToInputLocationMap,
+ ExecutorService intoOperationExecutor) {
this.operatorContext = operatorContext;
this.child = child;
this.insertTabletStatementGenerators = insertTabletStatementGenerators;
this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
+ this.writeOperationExecutor = intoOperationExecutor;
}
protected static List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
@@ -144,14 +141,12 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
client = new DataNodeInternalClient(operatorContext.getSessionInfo());
}
- isBlocked = SettableFuture.create();
writeOperationFuture =
- writeOperationExecutor.submit(() -> client.insertTablets(insertMultiTabletsStatement));
- writeOperationFuture.addListener(
- () -> ((SettableFuture<Void>) isBlocked).set(null), writeOperationExecutor);
+ Futures.submit(
+ () -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor);
}
- protected boolean writeOperationDone() {
+ protected boolean handleWriteOperationFuture() {
if (writeOperationFuture == null) {
return true;
}
@@ -220,9 +215,26 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
return operatorContext;
}
+ private boolean writeOperationDone() {
+ if (writeOperationFuture == null) {
+ return true;
+ }
+
+ return writeOperationFuture.isDone();
+ }
+
@Override
public ListenableFuture<?> isBlocked() {
- return successfulAsList(Arrays.asList(isBlocked, child.isBlocked()));
+ ListenableFuture<?> childBlocked = child.isBlocked();
+ if (writeOperationDone() && childBlocked.isDone()) {
+ return NOT_BLOCKED;
+ } else if (!writeOperationDone() && childBlocked.isDone()) {
+ return writeOperationFuture;
+ } else if (writeOperationDone() && !childBlocked.isDone()) {
+ return childBlocked;
+ } else {
+ return successfulAsList(Arrays.asList(writeOperationFuture, childBlocked));
+ }
}
@Override
@@ -237,7 +249,9 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
if (client != null) {
client.close();
}
- writeOperationExecutor.shutdown();
+ if (writeOperationFuture != null) {
+ writeOperationFuture.cancel(true);
+ }
child.close();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index e36bb49168..82214a562b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
public class DeviceViewIntoOperator extends AbstractIntoOperator {
@@ -60,8 +61,9 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap,
Map<String, Boolean> targetDeviceToAlignedMap,
Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
- Map<String, InputLocation> sourceColumnToInputLocationMap) {
- super(operatorContext, child, null, sourceColumnToInputLocationMap);
+ Map<String, InputLocation> sourceColumnToInputLocationMap,
+ ExecutorService intoOperationExecutor) {
+ super(operatorContext, child, null, sourceColumnToInputLocationMap, intoOperationExecutor);
this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
@@ -76,7 +78,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
@Override
public TsBlock next() {
- if (!writeOperationDone()) {
+ if (!handleWriteOperationFuture()) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 67a4fb0b0e..9eb6c14886 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
public class IntoOperator extends AbstractIntoOperator {
@@ -48,19 +49,21 @@ public class IntoOperator extends AbstractIntoOperator {
Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
Map<String, Boolean> targetDeviceToAlignedMap,
List<Pair<String, PartialPath>> sourceTargetPathPairList,
- Map<String, InputLocation> sourceColumnToInputLocationMap) {
+ Map<String, InputLocation> sourceColumnToInputLocationMap,
+ ExecutorService intoOperationExecutor) {
super(
operatorContext,
child,
constructInsertTabletStatementGenerators(
targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap),
- sourceColumnToInputLocationMap);
+ sourceColumnToInputLocationMap,
+ intoOperationExecutor);
this.sourceTargetPathPairList = sourceTargetPathPairList;
}
@Override
public TsBlock next() {
- if (!writeOperationDone()) {
+ if (!handleWriteOperationFuture()) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 4a3d844b73..e2497d44c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -35,6 +35,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
@@ -169,4 +170,8 @@ public class LocalExecutionPlanContext {
public long getDataRegionTTL() {
return dataRegionTTL;
}
+
+ public ExecutorService getIntoOperationExecutor() {
+ return instanceContext.getIntoOperationExecutor();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 484a342715..a6c2ebd261 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1383,7 +1383,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
targetPathToDataTypeMap,
intoPathDescriptor.getTargetDeviceToAlignedMap(),
intoPathDescriptor.getSourceTargetPathPairList(),
- sourceColumnToInputLocationMap);
+ sourceColumnToInputLocationMap,
+ context.getIntoOperationExecutor());
}
@Override
@@ -1433,7 +1434,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
deviceToTargetPathDataTypeMap,
deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(),
deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
- sourceColumnToInputLocationMap);
+ sourceColumnToInputLocationMap,
+ context.getIntoOperationExecutor());
}
private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {