You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/30 11:43:54 UTC

[iotdb] 03/04: finish

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3186ba814d70c886779b0bc6d5c20abc5160870a
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Nov 30 19:02:03 2022 +0800

    finish
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 15 +++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  9 +++++
 .../fragment/FragmentInstanceContext.java          | 22 +++++++++--
 .../fragment/FragmentInstanceManager.java          | 16 +++++++-
 .../operator/process/AbstractIntoOperator.java     | 46 ++++++++++++++--------
 .../operator/process/DeviceViewIntoOperator.java   |  8 ++--
 .../execution/operator/process/IntoOperator.java   |  9 +++--
 .../plan/planner/LocalExecutionPlanContext.java    |  5 +++
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  6 ++-
 9 files changed, 106 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 133211462a..6e3d88e739 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -683,6 +683,13 @@ public class IoTDBConfig {
    */
   private int selectIntoInsertTabletPlanRowLimit = 10000;
 
+  /**
+   * How many thread will be set up to execute into operation. When <= 0, use max(1, CPU core number
+   * / 2).
+   */
+  private int intoOperationSubmitThreadCount =
+      Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
+
   /** Default TSfile storage is in local file system */
   private FSType tsFileStorageFs = FSType.LOCAL;
 
@@ -1897,6 +1904,14 @@ public class IoTDBConfig {
     return selectIntoInsertTabletPlanRowLimit;
   }
 
+  public int getIntoOperationSubmitThreadCount() {
+    return intoOperationSubmitThreadCount;
+  }
+
+  public void setIntoOperationSubmitThreadCount(int intoOperationSubmitThreadCount) {
+    this.intoOperationSubmitThreadCount = intoOperationSubmitThreadCount;
+  }
+
   public int getCompactionWriteThroughputMbPerSec() {
     return compactionWriteThroughputMbPerSec;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index dd3bb85a91..58e9287abe 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -897,6 +897,15 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "select_into_insert_tablet_plan_row_limit",
                 String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
+    conf.setIntoOperationSubmitThreadCount(
+        Integer.parseInt(
+            properties.getProperty(
+                "into_operation_submit_thread_count",
+                String.valueOf(conf.getIntoOperationSubmitThreadCount()))));
+    if (conf.getIntoOperationSubmitThreadCount() <= 0) {
+      conf.setIntoOperationSubmitThreadCount(
+          Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+    }
 
     conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 69928dcec3..5a4ede9538 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -33,6 +33,7 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -65,6 +66,8 @@ public class FragmentInstanceContext extends QueryContext {
   // session info
   private SessionInfo sessionInfo;
 
+  private ExecutorService intoOperationExecutor;
+
   //    private final GcMonitor gcMonitor;
   //    private final AtomicLong startNanos = new AtomicLong();
   //    private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -74,9 +77,12 @@ public class FragmentInstanceContext extends QueryContext {
   //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
 
   public static FragmentInstanceContext createFragmentInstanceContext(
-      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      SessionInfo sessionInfo,
+      ExecutorService intoOperationExecutor) {
     FragmentInstanceContext instanceContext =
-        new FragmentInstanceContext(id, stateMachine, sessionInfo);
+        new FragmentInstanceContext(id, stateMachine, sessionInfo, intoOperationExecutor);
     instanceContext.initialize();
     instanceContext.start();
     return instanceContext;
@@ -87,11 +93,15 @@ public class FragmentInstanceContext extends QueryContext {
   }
 
   private FragmentInstanceContext(
-      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      SessionInfo sessionInfo,
+      ExecutorService intoOperationExecutor) {
     this.id = id;
     this.stateMachine = stateMachine;
     this.executionEndTime.set(END_TIME_INITIAL_VALUE);
     this.sessionInfo = sessionInfo;
+    this.intoOperationExecutor = intoOperationExecutor;
   }
 
   @TestOnly
@@ -99,7 +109,7 @@ public class FragmentInstanceContext extends QueryContext {
       FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
     FragmentInstanceContext instanceContext =
         new FragmentInstanceContext(
-            id, stateMachine, new SessionInfo(1, "test", ZoneId.systemDefault().getId()));
+            id, stateMachine, new SessionInfo(1, "test", ZoneId.systemDefault().getId()), null);
     instanceContext.initialize();
     instanceContext.start();
     return instanceContext;
@@ -238,4 +248,8 @@ public class FragmentInstanceContext extends QueryContext {
   public Optional<Throwable> getFailureCause() {
     return Optional.ofNullable(stateMachine.getFailureCauses().peek());
   }
+
+  public ExecutorService getIntoOperationExecutor() {
+    return intoOperationExecutor;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index fbee7f88d9..b6cbc8690b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -68,6 +68,8 @@ public class FragmentInstanceManager {
   private static final long QUERY_TIMEOUT_MS =
       IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
 
+  private ExecutorService intoOperationExecutor;
+
   public static FragmentInstanceManager getInstance() {
     return FragmentInstanceManager.InstanceHolder.INSTANCE;
   }
@@ -79,6 +81,10 @@ public class FragmentInstanceManager {
         IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
     this.instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
+    this.intoOperationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            IoTDBDescriptor.getInstance().getConfig().getIntoOperationSubmitThreadCount(),
+            "into-operation-executor");
 
     this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
 
@@ -109,7 +115,10 @@ public class FragmentInstanceManager {
                         instanceId,
                         fragmentInstanceId ->
                             createFragmentInstanceContext(
-                                fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+                                fragmentInstanceId,
+                                stateMachine,
+                                instance.getSessionInfo(),
+                                intoOperationExecutor));
 
                 try {
                   DataDriver driver =
@@ -165,7 +174,10 @@ public class FragmentInstanceManager {
                       instanceId,
                       fragmentInstanceId ->
                           createFragmentInstanceContext(
-                              fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+                              fragmentInstanceId,
+                              stateMachine,
+                              instance.getSessionInfo(),
+                              intoOperationExecutor));
 
               try {
                 SchemaDriver driver =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 6181dab228..7d41985ece 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -37,10 +37,8 @@ import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
@@ -71,21 +69,20 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
   private DataNodeInternalClient client;
 
-  private ListenableFuture<?> isBlocked = NOT_BLOCKED;
-
-  private final ListeningExecutorService writeOperationExecutor =
-      MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+  private final ExecutorService writeOperationExecutor;
   private ListenableFuture<TSStatus> writeOperationFuture;
 
   public AbstractIntoOperator(
       OperatorContext operatorContext,
       Operator child,
       List<InsertTabletStatementGenerator> insertTabletStatementGenerators,
-      Map<String, InputLocation> sourceColumnToInputLocationMap) {
+      Map<String, InputLocation> sourceColumnToInputLocationMap,
+      ExecutorService intoOperationExecutor) {
     this.operatorContext = operatorContext;
     this.child = child;
     this.insertTabletStatementGenerators = insertTabletStatementGenerators;
     this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
+    this.writeOperationExecutor = intoOperationExecutor;
   }
 
   protected static List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
@@ -144,14 +141,12 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
       client = new DataNodeInternalClient(operatorContext.getSessionInfo());
     }
 
-    isBlocked = SettableFuture.create();
     writeOperationFuture =
-        writeOperationExecutor.submit(() -> client.insertTablets(insertMultiTabletsStatement));
-    writeOperationFuture.addListener(
-        () -> ((SettableFuture<Void>) isBlocked).set(null), writeOperationExecutor);
+        Futures.submit(
+            () -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor);
   }
 
-  protected boolean writeOperationDone() {
+  protected boolean handleWriteOperationFuture() {
     if (writeOperationFuture == null) {
       return true;
     }
@@ -220,9 +215,26 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     return operatorContext;
   }
 
+  private boolean writeOperationDone() {
+    if (writeOperationFuture == null) {
+      return true;
+    }
+
+    return writeOperationFuture.isDone();
+  }
+
   @Override
   public ListenableFuture<?> isBlocked() {
-    return successfulAsList(Arrays.asList(isBlocked, child.isBlocked()));
+    ListenableFuture<?> childBlocked = child.isBlocked();
+    if (writeOperationDone() && childBlocked.isDone()) {
+      return NOT_BLOCKED;
+    } else if (!writeOperationDone() && childBlocked.isDone()) {
+      return writeOperationFuture;
+    } else if (writeOperationDone() && !childBlocked.isDone()) {
+      return childBlocked;
+    } else {
+      return successfulAsList(Arrays.asList(writeOperationFuture, childBlocked));
+    }
   }
 
   @Override
@@ -237,7 +249,9 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     if (client != null) {
       client.close();
     }
-    writeOperationExecutor.shutdown();
+    if (writeOperationFuture != null) {
+      writeOperationFuture.cancel(true);
+    }
     child.close();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index e36bb49168..82214a562b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 public class DeviceViewIntoOperator extends AbstractIntoOperator {
@@ -60,8 +61,9 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
       Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap,
       Map<String, Boolean> targetDeviceToAlignedMap,
       Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
-      Map<String, InputLocation> sourceColumnToInputLocationMap) {
-    super(operatorContext, child, null, sourceColumnToInputLocationMap);
+      Map<String, InputLocation> sourceColumnToInputLocationMap,
+      ExecutorService intoOperationExecutor) {
+    super(operatorContext, child, null, sourceColumnToInputLocationMap, intoOperationExecutor);
     this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
     this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
     this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
@@ -76,7 +78,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
 
   @Override
   public TsBlock next() {
-    if (!writeOperationDone()) {
+    if (!handleWriteOperationFuture()) {
       return null;
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 67a4fb0b0e..9eb6c14886 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 public class IntoOperator extends AbstractIntoOperator {
@@ -48,19 +49,21 @@ public class IntoOperator extends AbstractIntoOperator {
       Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
       Map<String, Boolean> targetDeviceToAlignedMap,
       List<Pair<String, PartialPath>> sourceTargetPathPairList,
-      Map<String, InputLocation> sourceColumnToInputLocationMap) {
+      Map<String, InputLocation> sourceColumnToInputLocationMap,
+      ExecutorService intoOperationExecutor) {
     super(
         operatorContext,
         child,
         constructInsertTabletStatementGenerators(
             targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap),
-        sourceColumnToInputLocationMap);
+        sourceColumnToInputLocationMap,
+        intoOperationExecutor);
     this.sourceTargetPathPairList = sourceTargetPathPairList;
   }
 
   @Override
   public TsBlock next() {
-    if (!writeOperationDone()) {
+    if (!handleWriteOperationFuture()) {
       return null;
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 4a3d844b73..e2497d44c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -35,6 +35,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
@@ -169,4 +170,8 @@ public class LocalExecutionPlanContext {
   public long getDataRegionTTL() {
     return dataRegionTTL;
   }
+
+  public ExecutorService getIntoOperationExecutor() {
+    return instanceContext.getIntoOperationExecutor();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 484a342715..a6c2ebd261 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1383,7 +1383,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         targetPathToDataTypeMap,
         intoPathDescriptor.getTargetDeviceToAlignedMap(),
         intoPathDescriptor.getSourceTargetPathPairList(),
-        sourceColumnToInputLocationMap);
+        sourceColumnToInputLocationMap,
+        context.getIntoOperationExecutor());
   }
 
   @Override
@@ -1433,7 +1434,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         deviceToTargetPathDataTypeMap,
         deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(),
         deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
-        sourceColumnToInputLocationMap);
+        sourceColumnToInputLocationMap,
+        context.getIntoOperationExecutor());
   }
 
   private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {