You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/15 08:22:22 UTC
[iotdb] branch nested-operations updated: fix datatype infer
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/nested-operations by this push:
new 2aad08e fix datatype infer
2aad08e is described below
commit 2aad08ed0788d6c8bb5eecfa7e7f8ea017666b47
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Sep 15 16:21:08 2021 +0800
fix datatype infer
---
.../apache/iotdb/db/qp/physical/crud/UDFPlan.java | 5 ---
.../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 21 ----------
.../apache/iotdb/db/query/dataset/UDTFDataSet.java | 28 +++++++++----
.../iotdb/db/query/expression/Expression.java | 2 +
.../query/expression/binary/BinaryExpression.java | 13 +++---
.../query/expression/unary/FunctionExpression.java | 16 ++++++-
.../query/expression/unary/NegationExpression.java | 4 ++
.../query/expression/unary/TimeSeriesOperand.java | 3 +-
.../api/customizer/parameter/UDFParameters.java | 49 +++++++++++-----------
.../db/query/udf/core/executor/UDTFExecutor.java | 12 ++++--
.../iotdb/db/query/udf/core/layer/DAGBuilder.java | 4 ++
11 files changed, 86 insertions(+), 71 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDFPlan.java
index f1ed19f..a60640b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDFPlan.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.qp.physical.crud;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.expression.ResultColumn;
import java.util.List;
@@ -39,10 +38,6 @@ public interface UDFPlan {
*/
void constructUdfExecutors(List<ResultColumn> resultColumns);
- /** Allocate computing resources, create UDF instances, and call UDF initialization methods. */
- void initializeUdfExecutors(long queryId, float collectorMemoryBudgetInMb)
- throws QueryProcessException;
-
/** Call UDF finalization methods and release computing resources. */
void finalizeUDFExecutors(long queryId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index b7a1a75..38c33cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.qp.physical.crud;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
@@ -32,14 +31,12 @@ import org.apache.iotdb.db.query.expression.unary.NegationExpression;
import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
-import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import java.time.ZoneId;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -123,24 +120,6 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
}
@Override
- public void initializeUdfExecutors(long queryId, float collectorMemoryBudgetInMB)
- throws QueryProcessException {
- Collection<UDTFExecutor> executors = expressionName2Executor.values();
- collectorMemoryBudgetInMB /= executors.size();
-
- UDFRegistrationService.getInstance().acquireRegistrationLock();
- // This statement must be surrounded by the registration lock.
- UDFClassLoaderManager.getInstance().initializeUDFQuery(queryId);
- try {
- for (UDTFExecutor executor : executors) {
- executor.beforeStart(queryId, collectorMemoryBudgetInMB);
- }
- } finally {
- UDFRegistrationService.getInstance().releaseRegistrationLock();
- }
- }
-
- @Override
public void finalizeUDFExecutors(long queryId) {
try {
for (UDTFExecutor executor : expressionName2Executor.values()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index f58a8de..09d41e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.query.udf.core.layer.DAGBuilder;
import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
+import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
@@ -74,7 +76,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
timestampGenerator,
readersOfSelectedSeries,
cached);
- udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
+
initTransformers();
}
@@ -96,16 +98,28 @@ public abstract class UDTFDataSet extends QueryDataSet {
deduplicatedPaths,
deduplicatedDataTypes,
readersOfSelectedSeries);
- udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
+
initTransformers();
}
protected void initTransformers() throws QueryProcessException, IOException {
- transformers =
- new DAGBuilder(queryId, udtfPlan, rawQueryInputLayer, UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB)
- .buildLayerMemoryAssigner()
- .buildResultColumnPointReaders()
- .getResultColumnPointReaders();
+ UDFRegistrationService.getInstance().acquireRegistrationLock();
+ // This statement must be surrounded by the registration lock.
+ UDFClassLoaderManager.getInstance().initializeUDFQuery(queryId);
+ try {
+ // UDF executors will be initialized at the same time
+ transformers =
+ new DAGBuilder(
+ queryId,
+ udtfPlan,
+ rawQueryInputLayer,
+ UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB)
+ .buildLayerMemoryAssigner()
+ .buildResultColumnPointReaders()
+ .getResultColumnPointReaders();
+ } finally {
+ UDFRegistrationService.getInstance().releaseRegistrationLock();
+ }
}
public void finalizeUDFs(long queryId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index cea4875..25ca6ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.io.IOException;
import java.time.ZoneId;
@@ -65,6 +66,7 @@ public abstract class Expression {
UDTFPlan udtfPlan,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+ Map<Expression, TSDataType> expressionDataTypeMap,
LayerMemoryAssigner memoryAssigner)
throws QueryProcessException, IOException;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 99cfffc..9058adf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReference
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.io.IOException;
import java.time.ZoneId;
@@ -136,6 +137,7 @@ public abstract class BinaryExpression extends Expression {
UDTFPlan udtfPlan,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+ Map<Expression, TSDataType> expressionDataTypeMap,
LayerMemoryAssigner memoryAssigner)
throws QueryProcessException, IOException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
@@ -147,6 +149,7 @@ public abstract class BinaryExpression extends Expression {
udtfPlan,
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
+ expressionDataTypeMap,
memoryAssigner);
IntermediateLayer rightParentIntermediateLayer =
rightExpression.constructIntermediateLayer(
@@ -154,11 +157,13 @@ public abstract class BinaryExpression extends Expression {
udtfPlan,
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
+ expressionDataTypeMap,
memoryAssigner);
Transformer transformer =
constructTransformer(
leftParentIntermediateLayer.constructPointReader(),
rightParentIntermediateLayer.constructPointReader());
+ expressionDataTypeMap.put(this, transformer.getDataType());
expressionIntermediateLayerMap.put(
this,
@@ -175,14 +180,6 @@ public abstract class BinaryExpression extends Expression {
protected abstract ArithmeticBinaryTransformer constructTransformer(
LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader);
- public Expression getLeftExpression() {
- return leftExpression;
- }
-
- public Expression getRightExpression() {
- return rightExpression;
- }
-
@Override
public final String toString() {
return String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index adb703b..f720182 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowWindowTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryTransformer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.io.IOException;
import java.time.ZoneId;
@@ -69,6 +70,8 @@ public class FunctionExpression extends Expression {
*/
private List<Expression> expressions;
+ private List<TSDataType> inputExpressionTypes;
+
private List<PartialPath> paths;
private String parametersString;
@@ -187,6 +190,7 @@ public class FunctionExpression extends Expression {
UDTFPlan udtfPlan,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+ Map<Expression, TSDataType> expressionDataTypeMap,
LayerMemoryAssigner memoryAssigner)
throws QueryProcessException, IOException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
@@ -198,9 +202,12 @@ public class FunctionExpression extends Expression {
udtfPlan,
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
+ expressionDataTypeMap,
memoryAssigner);
Transformer transformer =
- constructUdfTransformer(udtfPlan, memoryAssigner, udfInputIntermediateLayer);
+ constructUdfTransformer(
+ queryId, udtfPlan, expressionDataTypeMap, memoryAssigner, udfInputIntermediateLayer);
+ expressionDataTypeMap.put(this, transformer.getDataType());
expressionIntermediateLayerMap.put(
this,
@@ -219,6 +226,7 @@ public class FunctionExpression extends Expression {
UDTFPlan udtfPlan,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+ Map<Expression, TSDataType> expressionDataTypeMap,
LayerMemoryAssigner memoryAssigner)
throws QueryProcessException, IOException {
List<IntermediateLayer> intermediateLayers = new ArrayList<>();
@@ -229,6 +237,7 @@ public class FunctionExpression extends Expression {
udtfPlan,
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
+ expressionDataTypeMap,
memoryAssigner));
}
return intermediateLayers.size() == 1
@@ -243,11 +252,16 @@ public class FunctionExpression extends Expression {
}
private UDFQueryTransformer constructUdfTransformer(
+ long queryId,
UDTFPlan udtfPlan,
+ Map<Expression, TSDataType> expressionDataTypeMap,
LayerMemoryAssigner memoryAssigner,
IntermediateLayer udfInputIntermediateLayer)
throws QueryProcessException, IOException {
UDTFExecutor executor = udtfPlan.getExecutorByFunctionExpression(this);
+
+ executor.beforeStart(queryId, memoryAssigner.assign(), expressionDataTypeMap);
+
AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
switch (accessStrategy.getAccessStrategyType()) {
case ROW_BY_ROW:
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 0e33cd8..a380852 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceI
import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.io.IOException;
import java.time.ZoneId;
@@ -100,6 +101,7 @@ public class NegationExpression extends Expression {
UDTFPlan udtfPlan,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+ Map<Expression, TSDataType> expressionDataTypeMap,
LayerMemoryAssigner memoryAssigner)
throws QueryProcessException, IOException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
@@ -111,9 +113,11 @@ public class NegationExpression extends Expression {
udtfPlan,
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
+ expressionDataTypeMap,
memoryAssigner);
Transformer transformer =
new ArithmeticNegationTransformer(parentLayerPointReader.constructPointReader());
+ expressionDataTypeMap.put(this, transformer.getDataType());
expressionIntermediateLayerMap.put(
this,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 55a79b8..b01f608 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -42,7 +42,6 @@ import java.util.Set;
public class TimeSeriesOperand extends Expression {
protected PartialPath path;
- protected TSDataType dataType;
public TimeSeriesOperand(PartialPath path) {
this.path = path;
@@ -93,6 +92,7 @@ public class TimeSeriesOperand extends Expression {
UDTFPlan udtfPlan,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+ Map<Expression, TSDataType> expressionDataTypeMap,
LayerMemoryAssigner memoryAssigner)
throws QueryProcessException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
@@ -100,6 +100,7 @@ public class TimeSeriesOperand extends Expression {
LayerPointReader parentLayerPointReader =
rawTimeSeriesInputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath()));
+ expressionDataTypeMap.put(this, parentLayerPointReader.getDataType());
expressionIntermediateLayerMap.put(
this,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/api/customizer/parameter/UDFParameters.java b/server/src/main/java/org/apache/iotdb/db/query/udf/api/customizer/parameter/UDFParameters.java
index 148fa6d..e84ad24 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/api/customizer/parameter/UDFParameters.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/api/customizer/parameter/UDFParameters.java
@@ -20,12 +20,15 @@
package org.apache.iotdb.db.query.udf.api.customizer.parameter;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
import org.apache.iotdb.db.query.udf.api.UDTF;
import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
-import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -43,51 +46,47 @@ import java.util.Map;
*/
public class UDFParameters {
+ private final List<Expression> expressions;
private final List<PartialPath> paths;
private final Map<String, String> attributes;
+ private final List<TSDataType> dataTypes;
+
+ public UDFParameters(
+ FunctionExpression functionExpression, Map<Expression, TSDataType> expressionDataTypeMap)
+ throws QueryProcessException {
+ expressions = functionExpression.getExpressions();
+ paths = functionExpression.getPaths();
+ attributes = functionExpression.getFunctionAttributes();
+ dataTypes = new ArrayList<>();
+ for (Expression expression : expressions) {
+ dataTypes.add(expressionDataTypeMap.get(expression));
+ }
+ }
- private List<TSDataType> dataTypes;
-
- public UDFParameters(List<PartialPath> paths, Map<String, String> attributes) {
- this.paths = paths;
- this.attributes = attributes;
- dataTypes = null;
+ public List<Expression> getExpressions() {
+ return expressions;
}
public List<PartialPath> getPaths() {
return paths;
}
- public List<TSDataType> getDataTypes() throws MetadataException {
- if (dataTypes == null) {
- dataTypes = SchemaUtils.getSeriesTypesByPaths(paths);
- }
- return dataTypes;
- }
-
public Map<String, String> getAttributes() {
return attributes;
}
+ public List<TSDataType> getDataTypes() throws MetadataException {
+ return dataTypes;
+ }
+
public PartialPath getPath(int index) {
return paths.get(index);
}
public TSDataType getDataType(int index) throws MetadataException {
- if (dataTypes == null) {
- dataTypes = SchemaUtils.getSeriesTypesByPaths(paths);
- }
return dataTypes.get(index);
}
- public TSDataType getDataType(String path) throws MetadataException {
- return SchemaUtils.getSeriesTypeByPath(new PartialPath(path));
- }
-
- public TSDataType getDataType(PartialPath path) throws MetadataException {
- return SchemaUtils.getSeriesTypeByPath(path);
- }
-
public boolean hasAttribute(String attributeKey) {
return attributes.containsKey(attributeKey);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
index 59b0a2d..48b94f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.query.udf.core.executor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
import org.apache.iotdb.db.query.udf.api.UDTF;
import org.apache.iotdb.db.query.udf.api.access.Row;
@@ -29,13 +30,16 @@ import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValida
import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.time.ZoneId;
+import java.util.Map;
public class UDTFExecutor {
protected final FunctionExpression expression;
protected final UDTFConfigurations configurations;
+
protected UDTF udtf;
protected ElasticSerializableTVList collector;
@@ -44,12 +48,14 @@ public class UDTFExecutor {
configurations = new UDTFConfigurations(zoneId);
}
- public void beforeStart(long queryId, float collectorMemoryBudgetInMB)
+ public void beforeStart(
+ long queryId,
+ float collectorMemoryBudgetInMB,
+ Map<Expression, TSDataType> expressionDataTypeMap)
throws QueryProcessException {
udtf = (UDTF) UDFRegistrationService.getInstance().reflect(expression);
- UDFParameters parameters =
- new UDFParameters(expression.getPaths(), expression.getFunctionAttributes());
+ UDFParameters parameters = new UDFParameters(expression, expressionDataTypeMap);
try {
udtf.validate(new UDFParameterValidator(parameters));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index 1a33c17..a146826 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.io.IOException;
import java.util.HashMap;
@@ -46,6 +47,7 @@ public class DAGBuilder {
// sub-expressions, but they can share the same point reader. we cache the point reader here to
// make sure that only one point reader will be built for one expression.
private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
+ private final Map<Expression, TSDataType> expressionDataTypeMap;
public DAGBuilder(
long queryId, UDTFPlan udtfPlan, RawQueryInputLayer inputLayer, float memoryBudgetInMB) {
@@ -63,6 +65,7 @@ public class DAGBuilder {
memoryAssigner = new LayerMemoryAssigner(memoryBudgetInMB);
expressionIntermediateLayerMap = new HashMap<>();
+ expressionDataTypeMap = new HashMap<>();
}
public DAGBuilder buildLayerMemoryAssigner() {
@@ -82,6 +85,7 @@ public class DAGBuilder {
udtfPlan,
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
+ expressionDataTypeMap,
memoryAssigner)
.constructPointReader();
}