You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/01 02:01:20 UTC

[iotdb] branch lmh/FixIntoOperator1.0 created (now 64da044e25)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 64da044e25 fix memory calculation

This branch includes the following new commits:

     new b14c10e302 revert queryThreadCount in IoTDBConfig
     new 03b5a5de5c add queryThreadCount config in IoTDBSelectIntoIT
     new 2c3f6c247d set queryThreadCount=1 in IoTDBSelectIntoIT
     new f6204fc62e refactor IntoOperator
     new 25bf877dbb fix
     new 80fa61e518 finish
     new e3582f766d add config
     new d7f5a13dc8 add some comments
     new 64da044e25 fix memory calculation

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 04/09: refactor IntoOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f6204fc62e761a58f0963663c9e4cffb454dd0bc
Author: liuminghui233 <54...@qq.com>
AuthorDate: Fri Nov 25 17:12:26 2022 +0800

    refactor IntoOperator
---
 .../operator/process/AbstractIntoOperator.java     | 101 +++++++++++++++++----
 .../operator/process/DeviceViewIntoOperator.java   |  79 +++++++++++-----
 .../execution/operator/process/IntoOperator.java   |  44 ++++++---
 3 files changed, 171 insertions(+), 53 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index f331b2ed87..9ec9b0d5bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -38,6 +38,11 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,19 +50,33 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
 public abstract class AbstractIntoOperator implements ProcessOperator {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIntoOperator.class);
+
   protected final OperatorContext operatorContext;
   protected final Operator child;
 
+  protected TsBlock cachedTsBlock;
+
   protected List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
 
   protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
   private DataNodeInternalClient client;
 
+  private ListenableFuture<?> isBlocked = NOT_BLOCKED;
+
+  private final ListeningExecutorService writeOperationExecutor =
+      MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+  private ListenableFuture<TSStatus> writeOperationFuture;
+
   public AbstractIntoOperator(
       OperatorContext operatorContext,
       Operator child,
@@ -87,10 +106,21 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     return insertTabletStatementGenerators;
   }
 
-  protected void insertMultiTabletsInternally(boolean needCheck) {
+  protected boolean insertMultiTabletsInternally(boolean needCheck) {
+    InsertMultiTabletsStatement insertMultiTabletsStatement =
+        constructInsertMultiTabletsStatement(needCheck);
+    if (insertMultiTabletsStatement == null) {
+      return false;
+    }
+
+    executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
+    return true;
+  }
+
+  protected InsertMultiTabletsStatement constructInsertMultiTabletsStatement(boolean needCheck) {
     if (insertTabletStatementGenerators == null
         || (needCheck && !existFullStatement(insertTabletStatementGenerators))) {
-      return;
+      return null;
     }
 
     List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
@@ -100,28 +130,64 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
       }
     }
     if (insertTabletStatementList.isEmpty()) {
-      return;
+      return null;
     }
 
     InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
     insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+    return insertMultiTabletsStatement;
+  }
 
+  protected void executeInsertMultiTabletsStatement(
+      InsertMultiTabletsStatement insertMultiTabletsStatement) {
     if (client == null) {
       client = new DataNodeInternalClient(operatorContext.getSessionInfo());
     }
-    TSStatus executionStatus = client.insertTablets(insertMultiTabletsStatement);
-    if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-      String message =
-          String.format(
-              "Error occurred while inserting tablets in SELECT INTO: %s",
-              executionStatus.getMessage());
-      throw new IntoProcessException(message);
-    }
 
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-      generator.reset();
+    isBlocked = SettableFuture.create();
+    writeOperationFuture =
+        writeOperationExecutor.submit(
+            () -> {
+              LOGGER.info("");
+              return client.insertTablets(insertMultiTabletsStatement);
+            });
+
+    writeOperationFuture.addListener(
+        () -> {
+          LOGGER.info("");
+          ((SettableFuture<Void>) isBlocked).set(null);
+        },
+        writeOperationExecutor);
+  }
+
+  protected boolean handleFuture() {
+    if (writeOperationFuture != null) {
+      if (writeOperationFuture.isDone()) {
+        try {
+          TSStatus executionStatus = writeOperationFuture.get();
+          if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+              && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+            String message =
+                String.format(
+                    "Error occurred while inserting tablets in SELECT INTO: %s",
+                    executionStatus.getMessage());
+            throw new IntoProcessException(message);
+          }
+
+          for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+            generator.reset();
+          }
+
+          writeOperationFuture = null;
+          return true;
+        } catch (ExecutionException | InterruptedException e) {
+          throw new IntoProcessException(e.getMessage());
+        }
+      } else {
+        return false;
+      }
     }
+    return true;
   }
 
   private boolean existFullStatement(
@@ -164,12 +230,14 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
+    return successfulAsList(Arrays.asList(isBlocked, child.isBlocked()));
   }
 
   @Override
   public boolean hasNext() {
-    return existNonEmptyStatement(insertTabletStatementGenerators) || child.hasNext();
+    return cachedTsBlock != null
+        || existNonEmptyStatement(insertTabletStatementGenerators)
+        || child.hasNext();
   }
 
   @Override
@@ -177,6 +245,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     if (client != null) {
       client.close();
     }
+    writeOperationExecutor.shutdown();
     child.close();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 68329d56e1..7ef65c9e16 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -20,12 +20,12 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.IntoProcessException;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -75,39 +75,70 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
   }
 
   @Override
-  public TsBlock next() throws IntoProcessException {
-    TsBlock inputTsBlock = child.next();
-    if (inputTsBlock != null) {
-      String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
-      if (!Objects.equals(device, currentDevice)) {
-        insertMultiTabletsInternally(false);
-        updateResultTsBlock();
-
-        insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device);
-        currentDevice = device;
-      }
-      int readIndex = 0;
-      while (readIndex < inputTsBlock.getPositionCount()) {
-        int lastReadIndex = readIndex;
-        for (IntoOperator.InsertTabletStatementGenerator generator :
-            insertTabletStatementGenerators) {
-          lastReadIndex =
-              Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
-        }
-        readIndex = lastReadIndex;
-        insertMultiTabletsInternally(true);
-      }
+  public TsBlock next() {
+    if (!handleFuture()) {
+      return null;
+    }
+
+    if (!processTsBlock(cachedTsBlock)) {
+      return null;
     }
+    cachedTsBlock = null;
 
     if (child.hasNext()) {
+      processTsBlock(child.next());
       return null;
     } else {
-      insertMultiTabletsInternally(false);
+      InsertMultiTabletsStatement insertMultiTabletsStatement =
+          constructInsertMultiTabletsStatement(false);
       updateResultTsBlock();
+      currentDevice = null;
+
+      if (insertMultiTabletsStatement != null) {
+        executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
+        return null;
+      }
       return resultTsBlockBuilder.build();
     }
   }
 
+  private boolean processTsBlock(TsBlock inputTsBlock) {
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return true;
+    }
+
+    String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
+    if (!Objects.equals(device, currentDevice)) {
+      InsertMultiTabletsStatement insertMultiTabletsStatement =
+          constructInsertMultiTabletsStatement(false);
+      updateResultTsBlock();
+
+      insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device);
+      currentDevice = device;
+
+      if (insertMultiTabletsStatement != null) {
+        executeInsertMultiTabletsStatement(insertMultiTabletsStatement);
+        cachedTsBlock = inputTsBlock;
+        return false;
+      }
+    }
+
+    int readIndex = 0;
+    while (readIndex < inputTsBlock.getPositionCount()) {
+      int lastReadIndex = readIndex;
+      for (IntoOperator.InsertTabletStatementGenerator generator :
+          insertTabletStatementGenerators) {
+        lastReadIndex = Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
+      }
+      readIndex = lastReadIndex;
+      if (insertMultiTabletsInternally(true)) {
+        cachedTsBlock = inputTsBlock.subTsBlock(readIndex);
+        return false;
+      }
+    }
+    return true;
+  }
+
   private void updateResultTsBlock() {
     if (currentDevice == null) {
       return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 4a34488779..9e5c46e9fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -60,28 +60,46 @@ public class IntoOperator extends AbstractIntoOperator {
 
   @Override
   public TsBlock next() {
-    TsBlock inputTsBlock = child.next();
-    if (inputTsBlock != null) {
-      int readIndex = 0;
-      while (readIndex < inputTsBlock.getPositionCount()) {
-        int lastReadIndex = readIndex;
-        for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-          lastReadIndex =
-              Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
-        }
-        readIndex = lastReadIndex;
-        insertMultiTabletsInternally(true);
-      }
+    if (!handleFuture()) {
+      return null;
     }
 
+    if (!processTsBlock(cachedTsBlock)) {
+      return null;
+    }
+    cachedTsBlock = null;
+
     if (child.hasNext()) {
+      processTsBlock(child.next());
       return null;
     } else {
-      insertMultiTabletsInternally(false);
+      if (insertMultiTabletsInternally(false)) {
+        return null;
+      }
       return constructResultTsBlock();
     }
   }
 
+  private boolean processTsBlock(TsBlock inputTsBlock) {
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return true;
+    }
+
+    int readIndex = 0;
+    while (readIndex < inputTsBlock.getPositionCount()) {
+      int lastReadIndex = readIndex;
+      for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+        lastReadIndex = Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
+      }
+      readIndex = lastReadIndex;
+      if (insertMultiTabletsInternally(true)) {
+        cachedTsBlock = inputTsBlock.subTsBlock(readIndex);
+        return false;
+      }
+    }
+    return true;
+  }
+
   private TsBlock constructResultTsBlock() {
     List<TSDataType> outputDataTypes =
         ColumnHeaderConstant.selectIntoColumnHeaders.stream()


[iotdb] 08/09: add some comments

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d7f5a13dc894ae224f7dd4e623e8e9bef0649da8
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Nov 30 20:06:52 2022 +0800

    add some comments
---
 .../fragment/FragmentInstanceManager.java          | 11 ++++++-----
 .../operator/process/AbstractIntoOperator.java     |  8 +++-----
 .../operator/process/DeviceViewIntoOperator.java   | 23 +++++++++++-----------
 .../execution/operator/process/IntoOperator.java   |  3 ++-
 4 files changed, 23 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 1bc2444bb1..f3da54b6c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -68,7 +68,7 @@ public class FragmentInstanceManager {
   private static final long QUERY_TIMEOUT_MS =
       IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
 
-  private ExecutorService intoOperationExecutor;
+  private final ExecutorService intoOperationExecutor;
 
   public static FragmentInstanceManager getInstance() {
     return FragmentInstanceManager.InstanceHolder.INSTANCE;
@@ -81,10 +81,6 @@ public class FragmentInstanceManager {
         IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
     this.instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
-    this.intoOperationExecutor =
-        IoTDBThreadPoolFactory.newFixedThreadPool(
-            IoTDBDescriptor.getInstance().getConfig().getIntoOperationExecutionThreadCount(),
-            "into-operation-executor");
 
     this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
 
@@ -96,6 +92,11 @@ public class FragmentInstanceManager {
         200,
         200,
         TimeUnit.MILLISECONDS);
+
+    this.intoOperationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            IoTDBDescriptor.getInstance().getConfig().getIntoOperationExecutionThreadCount(),
+            "into-operation-executor");
   }
 
   public FragmentInstanceInfo execDataQueryFragmentInstance(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 7d41985ece..dd117e6d97 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -39,8 +39,6 @@ import org.apache.iotdb.tsfile.utils.BitMap;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -56,8 +54,6 @@ import static com.google.common.util.concurrent.Futures.successfulAsList;
 
 public abstract class AbstractIntoOperator implements ProcessOperator {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIntoOperator.class);
-
   protected final OperatorContext operatorContext;
   protected final Operator child;
 
@@ -103,6 +99,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     return insertTabletStatementGenerators;
   }
 
+  /** Return true if write task is submitted successfully. */
   protected boolean insertMultiTabletsInternally(boolean needCheck) {
     InsertMultiTabletsStatement insertMultiTabletsStatement =
         constructInsertMultiTabletsStatement(needCheck);
@@ -146,7 +143,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
             () -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor);
   }
 
-  protected boolean handleWriteOperationFuture() {
+  /** Return true if the previous write task has done. */
+  protected boolean processWriteOperationFuture() {
     if (writeOperationFuture == null) {
       return true;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 82214a562b..07cd3c0a0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -78,7 +78,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
 
   @Override
   public TsBlock next() {
-    if (!handleWriteOperationFuture()) {
+    if (!processWriteOperationFuture()) {
       return null;
     }
 
@@ -107,6 +107,17 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
     }
   }
 
+  private List<IntoOperator.InsertTabletStatementGenerator>
+      constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
+    Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+        deviceToTargetPathSourceInputLocationMap.get(currentDevice);
+    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
+        deviceToTargetPathDataTypeMap.get(currentDevice);
+    return constructInsertTabletStatementGenerators(
+        targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
+  }
+
+  /** Return true if write task is submitted during processing */
   private boolean processTsBlock(TsBlock inputTsBlock) {
     if (inputTsBlock == null || inputTsBlock.isEmpty()) {
       return true;
@@ -163,14 +174,4 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
       resultTsBlockBuilder.declarePosition();
     }
   }
-
-  private List<IntoOperator.InsertTabletStatementGenerator>
-      constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
-    Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
-        deviceToTargetPathSourceInputLocationMap.get(currentDevice);
-    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
-        deviceToTargetPathDataTypeMap.get(currentDevice);
-    return constructInsertTabletStatementGenerators(
-        targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 9eb6c14886..5aba97e22a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -63,7 +63,7 @@ public class IntoOperator extends AbstractIntoOperator {
 
   @Override
   public TsBlock next() {
-    if (!handleWriteOperationFuture()) {
+    if (!processWriteOperationFuture()) {
       return null;
     }
 
@@ -86,6 +86,7 @@ public class IntoOperator extends AbstractIntoOperator {
     }
   }
 
+  /** Return true if write task is submitted during processing */
   private boolean processTsBlock(TsBlock inputTsBlock) {
     if (inputTsBlock == null || inputTsBlock.isEmpty()) {
       return true;


[iotdb] 07/09: add config

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e3582f766d0e596c37247ba0e2dff961329c8ba1
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Nov 30 19:41:45 2022 +0800

    add config
---
 docs/UserGuide/Reference/Common-Config-Manual.md         |  9 +++++++++
 docs/zh/UserGuide/Reference/Common-Config-Manual.md      |  9 +++++++++
 .../src/assembly/resources/conf/iotdb-common.properties  |  4 ++++
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java  | 16 ++++++----------
 .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java   | 11 +++++------
 .../mpp/execution/fragment/FragmentInstanceManager.java  |  2 +-
 6 files changed, 34 insertions(+), 17 deletions(-)

diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md
index 504824ccdc..c75c4ac14e 100644
--- a/docs/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/UserGuide/Reference/Common-Config-Manual.md
@@ -1384,6 +1384,15 @@ Different configuration parameters take effect in the following three ways:
 |   Default   | 10000                                                        |
 |  Effective  | hot-load                                                      |
 
+* into\_operation\_execution\_thread\_count
+
+|    Name     | into\_operation\_execution\_thread\_count                     |
+| :---------: | :------------------------------------------------------------ |
+| Description | The number of threads in the thread pool that execute insert-tablet tasks |
+|    Type     | int32                                                         |
+|   Default   | 2                                                             |
+|  Effective  | After restarting system                                       |
+
 ### Continuous Query
 
 * continuous\_query\_execution\_thread
diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
index 3fa5556310..e842f67bef 100644
--- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
@@ -1428,6 +1428,15 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。
 |    默认值    | 10000                                                        |
 | 改后生效方式 | 热加载                                                     |
 
+* into\_operation\_execution\_thread\_count
+
+|     名字     | into\_operation\_execution\_thread\_count |
+| :---------: | :---------------------------------------- |
+|     描述     | SELECT INTO 中执行写入任务的线程池的线程数      |
+|     类型     | int32                                     |
+|    默认值    | 2                                         |
+| 改后生效方式  | 重启服务生效                                 |
+
 ### 连续查询配置
 
 * continuous\_query\_submit\_thread\_count
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 930ab46ba2..b6689c3c1d 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -877,6 +877,10 @@
 # Datatype: int
 # select_into_insert_tablet_plan_row_limit=10000
 
+# The number of threads in the thread pool that execute insert-tablet tasks
+# Datatype: int
+# into_operation_execution_thread_count=2
+
 ####################
 ### Continuous Query Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 92abf21537..99dcf1b865 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -685,12 +685,8 @@ public class IoTDBConfig {
    */
   private int selectIntoInsertTabletPlanRowLimit = 10000;
 
-  /**
-   * How many thread will be set up to execute into operation. When <= 0, use max(1, CPU core number
-   * / 2).
-   */
-  private int intoOperationSubmitThreadCount =
-      Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
+  /** The number of threads in the thread pool that execute insert-tablet tasks. */
+  private int intoOperationExecutionThreadCount = 2;
 
   /** Default TSfile storage is in local file system */
   private FSType tsFileStorageFs = FSType.LOCAL;
@@ -1909,12 +1905,12 @@ public class IoTDBConfig {
     return selectIntoInsertTabletPlanRowLimit;
   }
 
-  public int getIntoOperationSubmitThreadCount() {
-    return intoOperationSubmitThreadCount;
+  public int getIntoOperationExecutionThreadCount() {
+    return intoOperationExecutionThreadCount;
   }
 
-  public void setIntoOperationSubmitThreadCount(int intoOperationSubmitThreadCount) {
-    this.intoOperationSubmitThreadCount = intoOperationSubmitThreadCount;
+  public void setIntoOperationExecutionThreadCount(int intoOperationExecutionThreadCount) {
+    this.intoOperationExecutionThreadCount = intoOperationExecutionThreadCount;
   }
 
   public int getCompactionWriteThroughputMbPerSec() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 9bab834045..7a25d6d21f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -903,14 +903,13 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "select_into_insert_tablet_plan_row_limit",
                 String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
-    conf.setIntoOperationSubmitThreadCount(
+    conf.setIntoOperationExecutionThreadCount(
         Integer.parseInt(
             properties.getProperty(
-                "into_operation_submit_thread_count",
-                String.valueOf(conf.getIntoOperationSubmitThreadCount()))));
-    if (conf.getIntoOperationSubmitThreadCount() <= 0) {
-      conf.setIntoOperationSubmitThreadCount(
-          Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+                "into_operation_execution_thread_count",
+                String.valueOf(conf.getIntoOperationExecutionThreadCount()))));
+    if (conf.getIntoOperationExecutionThreadCount() <= 0) {
+      conf.setIntoOperationExecutionThreadCount(2);
     }
 
     conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 5114d2dcbb..1bc2444bb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -83,7 +83,7 @@ public class FragmentInstanceManager {
         IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
     this.intoOperationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(
-            IoTDBDescriptor.getInstance().getConfig().getIntoOperationSubmitThreadCount(),
+            IoTDBDescriptor.getInstance().getConfig().getIntoOperationExecutionThreadCount(),
             "into-operation-executor");
 
     this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);


[iotdb] 02/09: add queryThreadCount config in IoTDBSelectIntoIT

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 03b5a5de5cdd26962618e796932502dcea1df652
Author: liuminghui233 <54...@qq.com>
AuthorDate: Fri Nov 25 12:34:17 2022 +0800

    add queryThreadCount config in IoTDBSelectIntoIT
---
 .../src/main/java/org/apache/iotdb/it/env/MppConfig.java   |  9 +++++++++
 .../main/java/org/apache/iotdb/itbase/env/BaseConfig.java  |  8 ++++++++
 .../org/apache/iotdb/db/it/env/StandaloneEnvConfig.java    | 14 ++++++++++++++
 .../apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java  |  3 +++
 .../apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java  |  3 +++
 .../apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java   |  5 +++++
 6 files changed, 42 insertions(+)

diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index a50d828dfb..df5535bdf8 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -379,4 +379,13 @@ public class MppConfig implements BaseConfig {
         "least_data_region_group_num", String.valueOf(leastDataRegionGroupNum));
     return this;
   }
+
+  @Override
+  public BaseConfig setQueryThreadCount(int queryThreadCount) {
+    if (queryThreadCount <= 0) {
+      queryThreadCount = Runtime.getRuntime().availableProcessors();
+    }
+    confignodeProperties.setProperty("query_thread_count", String.valueOf(queryThreadCount));
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 741e2f2d5c..31def0bc8b 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -404,4 +404,12 @@ public interface BaseConfig {
   default int getLeastDataRegionGroupNum() {
     return 5;
   }
+
+  default BaseConfig setQueryThreadCount(int queryThreadCount) {
+    return this;
+  }
+
+  default int getQueryThreadCount() {
+    return Runtime.getRuntime().availableProcessors();
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
index 32291cb01c..bb6aff4fb0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
@@ -337,4 +337,18 @@ public class StandaloneEnvConfig implements BaseConfig {
   public int getSelectIntoInsertTabletPlanRowLimit() {
     return IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
   }
+
+  @Override
+  public BaseConfig setQueryThreadCount(int queryThreadCount) {
+    if (queryThreadCount <= 0) {
+      queryThreadCount = Runtime.getRuntime().availableProcessors();
+    }
+    IoTDBDescriptor.getInstance().getConfig().setQueryThreadCount(queryThreadCount);
+    return this;
+  }
+
+  @Override
+  public int getQueryThreadCount() {
+    return IoTDBDescriptor.getInstance().getConfig().getQueryThreadCount();
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
index 33db4ce761..e6b3057a2e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
@@ -41,8 +41,10 @@ public class IoTDBSelectInto2IT extends IoTDBSelectIntoIT {
     selectIntoInsertTabletPlanRowLimit =
         ConfigFactory.getConfig().getSelectIntoInsertTabletPlanRowLimit();
     numOfPointsPerPage = ConfigFactory.getConfig().getMaxNumberOfPointsInPage();
+    queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
     ConfigFactory.getConfig().setSelectIntoInsertTabletPlanRowLimit(8);
     ConfigFactory.getConfig().setMaxNumberOfPointsInPage(5);
+    ConfigFactory.getConfig().setQueryThreadCount(2);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }
@@ -53,5 +55,6 @@ public class IoTDBSelectInto2IT extends IoTDBSelectIntoIT {
     ConfigFactory.getConfig()
         .setSelectIntoInsertTabletPlanRowLimit(selectIntoInsertTabletPlanRowLimit);
     ConfigFactory.getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+    ConfigFactory.getConfig().setQueryThreadCount(queryThreadCount);
   }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
index 12b8425f9d..23796e317c 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
@@ -40,7 +40,9 @@ public class IoTDBSelectInto3IT extends IoTDBSelectIntoIT {
   public static void setUp() throws Exception {
     selectIntoInsertTabletPlanRowLimit =
         ConfigFactory.getConfig().getSelectIntoInsertTabletPlanRowLimit();
+    queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
     ConfigFactory.getConfig().setSelectIntoInsertTabletPlanRowLimit(5);
+    ConfigFactory.getConfig().setQueryThreadCount(2);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }
@@ -50,5 +52,6 @@ public class IoTDBSelectInto3IT extends IoTDBSelectIntoIT {
     EnvFactory.getEnv().cleanAfterClass();
     ConfigFactory.getConfig()
         .setSelectIntoInsertTabletPlanRowLimit(selectIntoInsertTabletPlanRowLimit);
+    ConfigFactory.getConfig().setQueryThreadCount(queryThreadCount);
   }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index 6236fcd7cf..bbbaf1e330 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.it.selectinto;
 
+import org.apache.iotdb.it.env.ConfigFactory;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
@@ -47,6 +48,7 @@ public class IoTDBSelectIntoIT {
 
   protected static int selectIntoInsertTabletPlanRowLimit;
   protected static int numOfPointsPerPage;
+  protected static int queryThreadCount;
 
   protected static final String[] SQLs =
       new String[] {
@@ -106,6 +108,8 @@ public class IoTDBSelectIntoIT {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
+    ConfigFactory.getConfig().setQueryThreadCount(2);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }
@@ -113,6 +117,7 @@ public class IoTDBSelectIntoIT {
   @AfterClass
   public static void tearDown() throws Exception {
     EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig().setQueryThreadCount(queryThreadCount);
   }
 
   // -------------------------------------- ALIGN BY TIME ---------------------------------------


[iotdb] 03/09: set queryThreadCount=1 in IoTDBSelectIntoIT

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2c3f6c247d0f52ee212e73ba769ae8b41a7c1391
Author: liuminghui233 <54...@qq.com>
AuthorDate: Fri Nov 25 14:52:11 2022 +0800

    set queryThreadCount=1 in IoTDBSelectIntoIT
---
 .../test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java | 2 +-
 .../test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java | 2 +-
 .../test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java  | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
index e6b3057a2e..5a11ae1f8f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto2IT.java
@@ -44,7 +44,7 @@ public class IoTDBSelectInto2IT extends IoTDBSelectIntoIT {
     queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
     ConfigFactory.getConfig().setSelectIntoInsertTabletPlanRowLimit(8);
     ConfigFactory.getConfig().setMaxNumberOfPointsInPage(5);
-    ConfigFactory.getConfig().setQueryThreadCount(2);
+    ConfigFactory.getConfig().setQueryThreadCount(1);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
index 23796e317c..f37e12b588 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectInto3IT.java
@@ -42,7 +42,7 @@ public class IoTDBSelectInto3IT extends IoTDBSelectIntoIT {
         ConfigFactory.getConfig().getSelectIntoInsertTabletPlanRowLimit();
     queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
     ConfigFactory.getConfig().setSelectIntoInsertTabletPlanRowLimit(5);
-    ConfigFactory.getConfig().setQueryThreadCount(2);
+    ConfigFactory.getConfig().setQueryThreadCount(1);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index bbbaf1e330..5102d43e63 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -109,7 +109,7 @@ public class IoTDBSelectIntoIT {
   @BeforeClass
   public static void setUp() throws Exception {
     queryThreadCount = ConfigFactory.getConfig().getQueryThreadCount();
-    ConfigFactory.getConfig().setQueryThreadCount(2);
+    ConfigFactory.getConfig().setQueryThreadCount(1);
     EnvFactory.getEnv().initBeforeClass();
     prepareData(SQLs);
   }


[iotdb] 05/09: fix

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 25bf877dbb03751c792772f1c56488b855578d32
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 29 22:09:37 2022 +0800

    fix
---
 .../operator/process/AbstractIntoOperator.java     | 62 ++++++++++------------
 .../operator/process/DeviceViewIntoOperator.java   |  7 ++-
 .../execution/operator/process/IntoOperator.java   |  7 ++-
 3 files changed, 37 insertions(+), 39 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 9ec9b0d5bc..6181dab228 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -146,48 +146,40 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
     isBlocked = SettableFuture.create();
     writeOperationFuture =
-        writeOperationExecutor.submit(
-            () -> {
-              LOGGER.info("");
-              return client.insertTablets(insertMultiTabletsStatement);
-            });
-
+        writeOperationExecutor.submit(() -> client.insertTablets(insertMultiTabletsStatement));
     writeOperationFuture.addListener(
-        () -> {
-          LOGGER.info("");
-          ((SettableFuture<Void>) isBlocked).set(null);
-        },
-        writeOperationExecutor);
+        () -> ((SettableFuture<Void>) isBlocked).set(null), writeOperationExecutor);
   }
 
-  protected boolean handleFuture() {
-    if (writeOperationFuture != null) {
-      if (writeOperationFuture.isDone()) {
-        try {
-          TSStatus executionStatus = writeOperationFuture.get();
-          if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-              && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-            String message =
-                String.format(
-                    "Error occurred while inserting tablets in SELECT INTO: %s",
-                    executionStatus.getMessage());
-            throw new IntoProcessException(message);
-          }
-
-          for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-            generator.reset();
-          }
+  protected boolean writeOperationDone() {
+    if (writeOperationFuture == null) {
+      return true;
+    }
 
-          writeOperationFuture = null;
-          return true;
-        } catch (ExecutionException | InterruptedException e) {
-          throw new IntoProcessException(e.getMessage());
-        }
-      } else {
+    try {
+      if (!writeOperationFuture.isDone()) {
         return false;
       }
+
+      TSStatus executionStatus = writeOperationFuture.get();
+      if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        String message =
+            String.format(
+                "Error occurred while inserting tablets in SELECT INTO: %s",
+                executionStatus.getMessage());
+        throw new IntoProcessException(message);
+      }
+
+      for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+        generator.reset();
+      }
+
+      writeOperationFuture = null;
+      return true;
+    } catch (ExecutionException | InterruptedException e) {
+      throw new IntoProcessException(e.getMessage());
     }
-    return true;
   }
 
   private boolean existFullStatement(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 7ef65c9e16..e36bb49168 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -76,7 +76,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
 
   @Override
   public TsBlock next() {
-    if (!handleFuture()) {
+    if (!writeOperationDone()) {
       return null;
     }
 
@@ -86,7 +86,10 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
     cachedTsBlock = null;
 
     if (child.hasNext()) {
-      processTsBlock(child.next());
+      TsBlock inputTsBlock = child.next();
+      processTsBlock(inputTsBlock);
+
+      // call child.next only once
       return null;
     } else {
       InsertMultiTabletsStatement insertMultiTabletsStatement =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 9e5c46e9fa..67a4fb0b0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -60,7 +60,7 @@ public class IntoOperator extends AbstractIntoOperator {
 
   @Override
   public TsBlock next() {
-    if (!handleFuture()) {
+    if (!writeOperationDone()) {
       return null;
     }
 
@@ -70,7 +70,10 @@ public class IntoOperator extends AbstractIntoOperator {
     cachedTsBlock = null;
 
     if (child.hasNext()) {
-      processTsBlock(child.next());
+      TsBlock inputTsBlock = child.next();
+      processTsBlock(inputTsBlock);
+
+      // call child.next only once
       return null;
     } else {
       if (insertMultiTabletsInternally(false)) {


[iotdb] 06/09: finish

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 80fa61e518eb8f71a2085a9b3559ee45b0abd303
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Nov 30 19:02:03 2022 +0800

    finish
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 15 +++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  9 +++++
 .../fragment/FragmentInstanceContext.java          | 22 +++++++++--
 .../fragment/FragmentInstanceManager.java          | 16 +++++++-
 .../operator/process/AbstractIntoOperator.java     | 46 ++++++++++++++--------
 .../operator/process/DeviceViewIntoOperator.java   |  8 ++--
 .../execution/operator/process/IntoOperator.java   |  9 +++--
 .../plan/planner/LocalExecutionPlanContext.java    |  5 +++
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  6 ++-
 9 files changed, 106 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c3c2da9f0d..92abf21537 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -685,6 +685,13 @@ public class IoTDBConfig {
    */
   private int selectIntoInsertTabletPlanRowLimit = 10000;
 
+  /**
+   * How many thread will be set up to execute into operation. When <= 0, use max(1, CPU core number
+   * / 2).
+   */
+  private int intoOperationSubmitThreadCount =
+      Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
+
   /** Default TSfile storage is in local file system */
   private FSType tsFileStorageFs = FSType.LOCAL;
 
@@ -1902,6 +1909,14 @@ public class IoTDBConfig {
     return selectIntoInsertTabletPlanRowLimit;
   }
 
+  public int getIntoOperationSubmitThreadCount() {
+    return intoOperationSubmitThreadCount;
+  }
+
+  public void setIntoOperationSubmitThreadCount(int intoOperationSubmitThreadCount) {
+    this.intoOperationSubmitThreadCount = intoOperationSubmitThreadCount;
+  }
+
   public int getCompactionWriteThroughputMbPerSec() {
     return compactionWriteThroughputMbPerSec;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index bba0dd92c5..9bab834045 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -903,6 +903,15 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "select_into_insert_tablet_plan_row_limit",
                 String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
+    conf.setIntoOperationSubmitThreadCount(
+        Integer.parseInt(
+            properties.getProperty(
+                "into_operation_submit_thread_count",
+                String.valueOf(conf.getIntoOperationSubmitThreadCount()))));
+    if (conf.getIntoOperationSubmitThreadCount() <= 0) {
+      conf.setIntoOperationSubmitThreadCount(
+          Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+    }
 
     conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 69928dcec3..5a4ede9538 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -33,6 +33,7 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -65,6 +66,8 @@ public class FragmentInstanceContext extends QueryContext {
   // session info
   private SessionInfo sessionInfo;
 
+  private ExecutorService intoOperationExecutor;
+
   //    private final GcMonitor gcMonitor;
   //    private final AtomicLong startNanos = new AtomicLong();
   //    private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -74,9 +77,12 @@ public class FragmentInstanceContext extends QueryContext {
   //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
 
   public static FragmentInstanceContext createFragmentInstanceContext(
-      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      SessionInfo sessionInfo,
+      ExecutorService intoOperationExecutor) {
     FragmentInstanceContext instanceContext =
-        new FragmentInstanceContext(id, stateMachine, sessionInfo);
+        new FragmentInstanceContext(id, stateMachine, sessionInfo, intoOperationExecutor);
     instanceContext.initialize();
     instanceContext.start();
     return instanceContext;
@@ -87,11 +93,15 @@ public class FragmentInstanceContext extends QueryContext {
   }
 
   private FragmentInstanceContext(
-      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      SessionInfo sessionInfo,
+      ExecutorService intoOperationExecutor) {
     this.id = id;
     this.stateMachine = stateMachine;
     this.executionEndTime.set(END_TIME_INITIAL_VALUE);
     this.sessionInfo = sessionInfo;
+    this.intoOperationExecutor = intoOperationExecutor;
   }
 
   @TestOnly
@@ -99,7 +109,7 @@ public class FragmentInstanceContext extends QueryContext {
       FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
     FragmentInstanceContext instanceContext =
         new FragmentInstanceContext(
-            id, stateMachine, new SessionInfo(1, "test", ZoneId.systemDefault().getId()));
+            id, stateMachine, new SessionInfo(1, "test", ZoneId.systemDefault().getId()), null);
     instanceContext.initialize();
     instanceContext.start();
     return instanceContext;
@@ -238,4 +248,8 @@ public class FragmentInstanceContext extends QueryContext {
   public Optional<Throwable> getFailureCause() {
     return Optional.ofNullable(stateMachine.getFailureCauses().peek());
   }
+
+  public ExecutorService getIntoOperationExecutor() {
+    return intoOperationExecutor;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index db653ff690..5114d2dcbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -68,6 +68,8 @@ public class FragmentInstanceManager {
   private static final long QUERY_TIMEOUT_MS =
       IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
 
+  private ExecutorService intoOperationExecutor;
+
   public static FragmentInstanceManager getInstance() {
     return FragmentInstanceManager.InstanceHolder.INSTANCE;
   }
@@ -79,6 +81,10 @@ public class FragmentInstanceManager {
         IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
     this.instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
+    this.intoOperationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            IoTDBDescriptor.getInstance().getConfig().getIntoOperationSubmitThreadCount(),
+            "into-operation-executor");
 
     this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
 
@@ -109,7 +115,10 @@ public class FragmentInstanceManager {
                         instanceId,
                         fragmentInstanceId ->
                             createFragmentInstanceContext(
-                                fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+                                fragmentInstanceId,
+                                stateMachine,
+                                instance.getSessionInfo(),
+                                intoOperationExecutor));
 
                 try {
                   DataDriver driver =
@@ -165,7 +174,10 @@ public class FragmentInstanceManager {
                       instanceId,
                       fragmentInstanceId ->
                           createFragmentInstanceContext(
-                              fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+                              fragmentInstanceId,
+                              stateMachine,
+                              instance.getSessionInfo(),
+                              intoOperationExecutor));
 
               try {
                 SchemaDriver driver =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 6181dab228..7d41985ece 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -37,10 +37,8 @@ import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
@@ -71,21 +69,20 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
   private DataNodeInternalClient client;
 
-  private ListenableFuture<?> isBlocked = NOT_BLOCKED;
-
-  private final ListeningExecutorService writeOperationExecutor =
-      MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+  private final ExecutorService writeOperationExecutor;
   private ListenableFuture<TSStatus> writeOperationFuture;
 
   public AbstractIntoOperator(
       OperatorContext operatorContext,
       Operator child,
       List<InsertTabletStatementGenerator> insertTabletStatementGenerators,
-      Map<String, InputLocation> sourceColumnToInputLocationMap) {
+      Map<String, InputLocation> sourceColumnToInputLocationMap,
+      ExecutorService intoOperationExecutor) {
     this.operatorContext = operatorContext;
     this.child = child;
     this.insertTabletStatementGenerators = insertTabletStatementGenerators;
     this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
+    this.writeOperationExecutor = intoOperationExecutor;
   }
 
   protected static List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
@@ -144,14 +141,12 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
       client = new DataNodeInternalClient(operatorContext.getSessionInfo());
     }
 
-    isBlocked = SettableFuture.create();
     writeOperationFuture =
-        writeOperationExecutor.submit(() -> client.insertTablets(insertMultiTabletsStatement));
-    writeOperationFuture.addListener(
-        () -> ((SettableFuture<Void>) isBlocked).set(null), writeOperationExecutor);
+        Futures.submit(
+            () -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor);
   }
 
-  protected boolean writeOperationDone() {
+  protected boolean handleWriteOperationFuture() {
     if (writeOperationFuture == null) {
       return true;
     }
@@ -220,9 +215,26 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     return operatorContext;
   }
 
+  private boolean writeOperationDone() {
+    if (writeOperationFuture == null) {
+      return true;
+    }
+
+    return writeOperationFuture.isDone();
+  }
+
   @Override
   public ListenableFuture<?> isBlocked() {
-    return successfulAsList(Arrays.asList(isBlocked, child.isBlocked()));
+    ListenableFuture<?> childBlocked = child.isBlocked();
+    if (writeOperationDone() && childBlocked.isDone()) {
+      return NOT_BLOCKED;
+    } else if (!writeOperationDone() && childBlocked.isDone()) {
+      return writeOperationFuture;
+    } else if (writeOperationDone() && !childBlocked.isDone()) {
+      return childBlocked;
+    } else {
+      return successfulAsList(Arrays.asList(writeOperationFuture, childBlocked));
+    }
   }
 
   @Override
@@ -237,7 +249,9 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     if (client != null) {
       client.close();
     }
-    writeOperationExecutor.shutdown();
+    if (writeOperationFuture != null) {
+      writeOperationFuture.cancel(true);
+    }
     child.close();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index e36bb49168..82214a562b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 public class DeviceViewIntoOperator extends AbstractIntoOperator {
@@ -60,8 +61,9 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
       Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap,
       Map<String, Boolean> targetDeviceToAlignedMap,
       Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
-      Map<String, InputLocation> sourceColumnToInputLocationMap) {
-    super(operatorContext, child, null, sourceColumnToInputLocationMap);
+      Map<String, InputLocation> sourceColumnToInputLocationMap,
+      ExecutorService intoOperationExecutor) {
+    super(operatorContext, child, null, sourceColumnToInputLocationMap, intoOperationExecutor);
     this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
     this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
     this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
@@ -76,7 +78,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
 
   @Override
   public TsBlock next() {
-    if (!writeOperationDone()) {
+    if (!handleWriteOperationFuture()) {
       return null;
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 67a4fb0b0e..9eb6c14886 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 public class IntoOperator extends AbstractIntoOperator {
@@ -48,19 +49,21 @@ public class IntoOperator extends AbstractIntoOperator {
       Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
       Map<String, Boolean> targetDeviceToAlignedMap,
       List<Pair<String, PartialPath>> sourceTargetPathPairList,
-      Map<String, InputLocation> sourceColumnToInputLocationMap) {
+      Map<String, InputLocation> sourceColumnToInputLocationMap,
+      ExecutorService intoOperationExecutor) {
     super(
         operatorContext,
         child,
         constructInsertTabletStatementGenerators(
             targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap),
-        sourceColumnToInputLocationMap);
+        sourceColumnToInputLocationMap,
+        intoOperationExecutor);
     this.sourceTargetPathPairList = sourceTargetPathPairList;
   }
 
   @Override
   public TsBlock next() {
-    if (!writeOperationDone()) {
+    if (!handleWriteOperationFuture()) {
       return null;
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 4a3d844b73..e2497d44c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -35,6 +35,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
@@ -169,4 +170,8 @@ public class LocalExecutionPlanContext {
   public long getDataRegionTTL() {
     return dataRegionTTL;
   }
+
+  public ExecutorService getIntoOperationExecutor() {
+    return instanceContext.getIntoOperationExecutor();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 484a342715..a6c2ebd261 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1383,7 +1383,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         targetPathToDataTypeMap,
         intoPathDescriptor.getTargetDeviceToAlignedMap(),
         intoPathDescriptor.getSourceTargetPathPairList(),
-        sourceColumnToInputLocationMap);
+        sourceColumnToInputLocationMap,
+        context.getIntoOperationExecutor());
   }
 
   @Override
@@ -1433,7 +1434,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         deviceToTargetPathDataTypeMap,
         deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(),
         deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
-        sourceColumnToInputLocationMap);
+        sourceColumnToInputLocationMap,
+        context.getIntoOperationExecutor());
   }
 
   private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {


[iotdb] 09/09: fix memory calculation

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 64da044e25a7fd42e527d24b7f49f1195d0d5d8d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Nov 30 20:53:47 2022 +0800

    fix memory calculation
---
 .../operator/process/AbstractIntoOperator.java     | 16 ++++--
 .../operator/process/DeviceViewIntoOperator.java   | 13 ++++-
 .../execution/operator/process/IntoOperator.java   |  8 ++-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 66 +++++++++++++++++++++-
 4 files changed, 93 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index dd117e6d97..6b17fdc625 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -68,17 +68,25 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
   private final ExecutorService writeOperationExecutor;
   private ListenableFuture<TSStatus> writeOperationFuture;
 
+  private final long maxRetainedSize;
+  private final long maxReturnSize;
+
   public AbstractIntoOperator(
       OperatorContext operatorContext,
       Operator child,
       List<InsertTabletStatementGenerator> insertTabletStatementGenerators,
       Map<String, InputLocation> sourceColumnToInputLocationMap,
-      ExecutorService intoOperationExecutor) {
+      ExecutorService intoOperationExecutor,
+      long maxStatementSize,
+      long maxReturnSize) {
     this.operatorContext = operatorContext;
     this.child = child;
     this.insertTabletStatementGenerators = insertTabletStatementGenerators;
     this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
     this.writeOperationExecutor = intoOperationExecutor;
+
+    this.maxRetainedSize = child.calculateMaxReturnSize() + maxStatementSize;
+    this.maxReturnSize = maxReturnSize;
   }
 
   protected static List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
@@ -260,17 +268,17 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
   @Override
   public long calculateMaxPeekMemory() {
-    return child.calculateMaxPeekMemory();
+    return maxReturnSize + maxRetainedSize + child.calculateMaxPeekMemory();
   }
 
   @Override
   public long calculateMaxReturnSize() {
-    return child.calculateMaxReturnSize();
+    return maxReturnSize;
   }
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return child.calculateRetainedSizeAfterCallingNext();
+    return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
   }
 
   public static class InsertTabletStatementGenerator {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 07cd3c0a0a..183d7d6140 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -62,8 +62,17 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
       Map<String, Boolean> targetDeviceToAlignedMap,
       Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
       Map<String, InputLocation> sourceColumnToInputLocationMap,
-      ExecutorService intoOperationExecutor) {
-    super(operatorContext, child, null, sourceColumnToInputLocationMap, intoOperationExecutor);
+      ExecutorService intoOperationExecutor,
+      long maxStatementSize,
+      long maxReturnSize) {
+    super(
+        operatorContext,
+        child,
+        null,
+        sourceColumnToInputLocationMap,
+        intoOperationExecutor,
+        maxStatementSize,
+        maxReturnSize);
     this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
     this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
     this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 5aba97e22a..89953edc69 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -50,14 +50,18 @@ public class IntoOperator extends AbstractIntoOperator {
       Map<String, Boolean> targetDeviceToAlignedMap,
       List<Pair<String, PartialPath>> sourceTargetPathPairList,
       Map<String, InputLocation> sourceColumnToInputLocationMap,
-      ExecutorService intoOperationExecutor) {
+      ExecutorService intoOperationExecutor,
+      long maxStatementSize,
+      long maxReturnSize) {
     super(
         operatorContext,
         child,
         constructInsertTabletStatementGenerators(
             targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap),
         sourceColumnToInputLocationMap,
-        intoOperationExecutor);
+        intoOperationExecutor,
+        maxStatementSize,
+        maxReturnSize);
     this.sourceTargetPathPairList = sourceTargetPathPairList;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index a6c2ebd261..a858f56833 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
@@ -170,6 +171,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
+import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
 import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
@@ -177,6 +179,8 @@ import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.Gt;
 import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
@@ -1375,7 +1379,17 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         sourceColumnToInputLocationMap,
         context.getTypeProvider());
 
+    int rowLimit =
+        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+    long maxStatementSize = calculateStatementSizePerLine(targetPathToDataTypeMap) * rowLimit;
+    long maxReturnSize =
+        node.getChild().getOutputColumnNames().size()
+            * (LongColumn.SIZE_IN_BYTES_PER_POSITION
+                + IntColumn.SIZE_IN_BYTES_PER_POSITION
+                + 256 * Byte.BYTES);
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+
     return new IntoOperator(
         operatorContext,
         child,
@@ -1384,7 +1398,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         intoPathDescriptor.getTargetDeviceToAlignedMap(),
         intoPathDescriptor.getSourceTargetPathPairList(),
         sourceColumnToInputLocationMap,
-        context.getIntoOperationExecutor());
+        context.getIntoOperationExecutor(),
+        maxStatementSize,
+        maxReturnSize);
   }
 
   @Override
@@ -1409,6 +1425,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         new HashMap<>();
     Map<String, Map<PartialPath, Map<String, String>>> sourceDeviceToTargetPathMap =
         deviceViewIntoPathDescriptor.getSourceDeviceToTargetPathMap();
+    long statementSizePerLine = 0L;
     for (Map.Entry<String, Map<PartialPath, Map<String, String>>> deviceEntry :
         sourceDeviceToTargetPathMap.entrySet()) {
       String sourceDevice = deviceEntry.getKey();
@@ -1424,8 +1441,19 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
       deviceToTargetPathSourceInputLocationMap.put(
           sourceDevice, targetPathToSourceInputLocationMap);
       deviceToTargetPathDataTypeMap.put(sourceDevice, targetPathToDataTypeMap);
+      statementSizePerLine += calculateStatementSizePerLine(targetPathToDataTypeMap);
     }
 
+    int rowLimit =
+        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+    long maxStatementSize = statementSizePerLine * rowLimit;
+    long maxReturnSize =
+        deviceToTargetPathDataTypeMap.size()
+            * (node.getChild().getOutputColumnNames().size() - 1)
+            * (LongColumn.SIZE_IN_BYTES_PER_POSITION
+                + IntColumn.SIZE_IN_BYTES_PER_POSITION
+                + 512 * Byte.BYTES);
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new DeviceViewIntoOperator(
         operatorContext,
@@ -1435,7 +1463,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(),
         deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
         sourceColumnToInputLocationMap,
-        context.getIntoOperationExecutor());
+        context.getIntoOperationExecutor(),
+        maxStatementSize,
+        maxReturnSize);
   }
 
   private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {
@@ -1469,6 +1499,38 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     }
   }
 
+  private long calculateStatementSizePerLine(
+      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap) {
+    long maxStatementSize = Long.BYTES;
+    List<TSDataType> dataTypes =
+        targetPathToDataTypeMap.values().stream()
+            .flatMap(stringTSDataTypeMap -> stringTSDataTypeMap.values().stream())
+            .collect(Collectors.toList());
+    for (TSDataType dataType : dataTypes) {
+      maxStatementSize += getValueSizePerLine(dataType);
+    }
+    return maxStatementSize;
+  }
+
+  private static long getValueSizePerLine(TSDataType tsDataType) {
+    switch (tsDataType) {
+      case INT32:
+        return Integer.BYTES;
+      case INT64:
+        return Long.BYTES;
+      case FLOAT:
+        return Float.BYTES;
+      case DOUBLE:
+        return Double.BYTES;
+      case BOOLEAN:
+        return Byte.BYTES;
+      case TEXT:
+        return StatisticsManager.getInstance().getMaxBinarySizeInBytes(new PartialPath());
+      default:
+        throw new UnsupportedOperationException("Unknown data type " + tsDataType);
+    }
+  }
+
   @Override
   public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
     List<Operator> children =


[iotdb] 01/09: revert queryThreadCount in IoTDBConfig

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b14c10e30238bae70767d52391592f2751726da4
Author: liuminghui233 <54...@qq.com>
AuthorDate: Fri Nov 25 12:27:27 2022 +0800

    revert queryThreadCount in IoTDBConfig
---
 server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0f923aef2e..c3c2da9f0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -317,7 +317,7 @@ public class IoTDBConfig {
   private int flushThreadCount = Runtime.getRuntime().availableProcessors();
 
   /** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */
-  private int queryThreadCount = Math.max(4, Runtime.getRuntime().availableProcessors());
+  private int queryThreadCount = Runtime.getRuntime().availableProcessors();
 
   /** How many queries can be concurrently executed. When <= 0, use 1000. */
   private int maxAllowedConcurrentQueries = 1000;