You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/12/02 04:18:32 UTC
[iotdb] 05/06: udf_min_fragment_number_to_trigger_parallel_execution
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 65e6715bbfedfd4ab492c50181d65ae6f471695a
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Dec 2 12:10:21 2021 +0800
udf_min_fragment_number_to_trigger_parallel_execution
---
.../src/assembly/resources/conf/iotdb-engine.properties | 5 +++++
.../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 17 +++++++++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | 13 +++++++++++++
.../iotdb/db/query/udf/core/layer/LayerBuilder.java | 8 ++++++--
4 files changed, 41 insertions(+), 2 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index aad2f9d..c736bd1 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -828,6 +828,11 @@ timestamp_precision=ms
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# udf_root_dir=ext/udf
+# UDTFPlan can be split into several fragment plans, when the number of the fragment plans is over
+# udf_min_fragment_number_to_trigger_parallel_execution, the executor would trigger a parallel
+# execution. The property should be an integer and larger than 1.
+# udf_min_fragment_number_to_trigger_parallel_execution=2
+
####################
### Trigger Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0ef9fbe..6038fb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -740,6 +740,13 @@ public class IoTDBConfig {
private float udfCollectorMemoryBudgetInMB = (float) (1.0 / 3 * udfMemoryBudgetInMB);
+ /**
+ * UDTFPlan can be split into several fragment plans, when the number of the fragment plans is
+ * over udfMinFragmentNumberToTriggerParallelExecution, the executor would trigger a parallel
+ * execution.
+ */
+ private int udfMinFragmentNumberToTriggerParallelExecution = 2;
+
// time in nanosecond precision when starting up
private long startUpNanosecond = System.nanoTime();
@@ -815,6 +822,16 @@ public class IoTDBConfig {
this.udfInitialByteArrayLengthForMemoryControl = udfInitialByteArrayLengthForMemoryControl;
}
+ public int getUdfMinFragmentNumberToTriggerParallelExecution() {
+ return udfMinFragmentNumberToTriggerParallelExecution;
+ }
+
+ public void setUdfMinFragmentNumberToTriggerParallelExecution(
+ int udfMinFragmentNumberToTriggerParallelExecution) {
+ this.udfMinFragmentNumberToTriggerParallelExecution =
+ udfMinFragmentNumberToTriggerParallelExecution;
+ }
+
public int getConcurrentWritingTimePartition() {
return concurrentWritingTimePartition;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 84f8051..a42572f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1200,6 +1200,13 @@ public class IoTDBDescriptor {
properties.getProperty(
"select_into_insert_tablet_plan_row_limit",
String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
+
+ // udf
+ conf.setUdfMinFragmentNumberToTriggerParallelExecution(
+ Integer.parseInt(
+ properties.getProperty(
+ "udf_min_fragment_number_to_trigger_parallel_execution",
+ String.valueOf(conf.getUdfMinFragmentNumberToTriggerParallelExecution()))));
} catch (Exception e) {
throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
}
@@ -1328,6 +1335,12 @@ public class IoTDBDescriptor {
+ readerTransformerCollectorMemoryProportion);
}
}
+
+ conf.setUdfMinFragmentNumberToTriggerParallelExecution(
+ Integer.parseInt(
+ properties.getProperty(
+ "udf_min_fragment_number_to_trigger_parallel_execution",
+ String.valueOf(conf.getUdfMinFragmentNumberToTriggerParallelExecution()))));
}
private void loadTriggerProps(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index a61d872..05f1b8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.query.udf.core.layer;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.dataset.udf.UDTFAlignByTimeDataSet;
@@ -38,6 +40,8 @@ import java.util.Map;
public class LayerBuilder {
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
private final long queryId;
private final UDTFPlan udtfPlan;
private final RawQueryInputLayer rawTimeSeriesInputLayer;
@@ -137,9 +141,9 @@ public class LayerBuilder {
return resultColumnPointReaders;
}
- /** TODO: make it configurable */
public boolean canBeSplitIntoFragments() {
- return 2 <= fragmentDataSetIndexToLayerPointReaders.size();
+ return Math.min(2, CONFIG.getUdfMinFragmentNumberToTriggerParallelExecution())
+ <= fragmentDataSetIndexToLayerPointReaders.size();
}
public QueryDataSet generateJoinDataSet(UDTFAlignByTimeDataSet udtfAlignByTimeDataSet)