You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/12/02 04:18:32 UTC

[iotdb] 05/06: udf_min_fragment_number_to_trigger_parallel_execution

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 65e6715bbfedfd4ab492c50181d65ae6f471695a
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Dec 2 12:10:21 2021 +0800

    udf_min_fragment_number_to_trigger_parallel_execution
---
 .../src/assembly/resources/conf/iotdb-engine.properties |  5 +++++
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 17 +++++++++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 13 +++++++++++++
 .../iotdb/db/query/udf/core/layer/LayerBuilder.java     |  8 ++++++--
 4 files changed, 41 insertions(+), 2 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index aad2f9d..c736bd1 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -828,6 +828,11 @@ timestamp_precision=ms
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
 # udf_root_dir=ext/udf
 
+# UDTFPlan can be split into several fragment plans, when the number of the fragment plans is over
+# udf_min_fragment_number_to_trigger_parallel_execution, the executor would trigger a parallel
+# execution. The property should be an integer and larger than 1.
+# udf_min_fragment_number_to_trigger_parallel_execution=2
+
 ####################
 ### Trigger Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0ef9fbe..6038fb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -740,6 +740,13 @@ public class IoTDBConfig {
 
   private float udfCollectorMemoryBudgetInMB = (float) (1.0 / 3 * udfMemoryBudgetInMB);
 
+  /**
+   * UDTFPlan can be split into several fragment plans, when the number of the fragment plans is
+   * over udfMinFragmentNumberToTriggerParallelExecution, the executor would trigger a parallel
+   * execution.
+   */
+  private int udfMinFragmentNumberToTriggerParallelExecution = 2;
+
   // time in nanosecond precision when starting up
   private long startUpNanosecond = System.nanoTime();
 
@@ -815,6 +822,16 @@ public class IoTDBConfig {
     this.udfInitialByteArrayLengthForMemoryControl = udfInitialByteArrayLengthForMemoryControl;
   }
 
+  public int getUdfMinFragmentNumberToTriggerParallelExecution() {
+    return udfMinFragmentNumberToTriggerParallelExecution;
+  }
+
+  public void setUdfMinFragmentNumberToTriggerParallelExecution(
+      int udfMinFragmentNumberToTriggerParallelExecution) {
+    this.udfMinFragmentNumberToTriggerParallelExecution =
+        udfMinFragmentNumberToTriggerParallelExecution;
+  }
+
   public int getConcurrentWritingTimePartition() {
     return concurrentWritingTimePartition;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 84f8051..a42572f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1200,6 +1200,13 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "select_into_insert_tablet_plan_row_limit",
                   String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
+
+      // udf
+      conf.setUdfMinFragmentNumberToTriggerParallelExecution(
+          Integer.parseInt(
+              properties.getProperty(
+                  "udf_min_fragment_number_to_trigger_parallel_execution",
+                  String.valueOf(conf.getUdfMinFragmentNumberToTriggerParallelExecution()))));
     } catch (Exception e) {
       throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
     }
@@ -1328,6 +1335,12 @@ public class IoTDBDescriptor {
                 + readerTransformerCollectorMemoryProportion);
       }
     }
+
+    conf.setUdfMinFragmentNumberToTriggerParallelExecution(
+        Integer.parseInt(
+            properties.getProperty(
+                "udf_min_fragment_number_to_trigger_parallel_execution",
+                String.valueOf(conf.getUdfMinFragmentNumberToTriggerParallelExecution()))));
   }
 
   private void loadTriggerProps(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index a61d872..05f1b8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.query.udf.core.layer;
 
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.dataset.udf.UDTFAlignByTimeDataSet;
@@ -38,6 +40,8 @@ import java.util.Map;
 
 public class LayerBuilder {
 
+  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
   private final long queryId;
   private final UDTFPlan udtfPlan;
   private final RawQueryInputLayer rawTimeSeriesInputLayer;
@@ -137,9 +141,9 @@ public class LayerBuilder {
     return resultColumnPointReaders;
   }
 
-  /** TODO: make it configurable */
   public boolean canBeSplitIntoFragments() {
-    return 2 <= fragmentDataSetIndexToLayerPointReaders.size();
+    return Math.min(2, CONFIG.getUdfMinFragmentNumberToTriggerParallelExecution())
+        <= fragmentDataSetIndexToLayerPointReaders.size();
   }
 
   public QueryDataSet generateJoinDataSet(UDTFAlignByTimeDataSet udtfAlignByTimeDataSet)