You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/15 08:22:22 UTC

[iotdb] branch nested-operations updated: fix datatype infer

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/nested-operations by this push:
     new 2aad08e  fix datatype infer
2aad08e is described below

commit 2aad08ed0788d6c8bb5eecfa7e7f8ea017666b47
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Sep 15 16:21:08 2021 +0800

    fix datatype infer
---
 .../apache/iotdb/db/qp/physical/crud/UDFPlan.java  |  5 ---
 .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 21 ----------
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java | 28 +++++++++----
 .../iotdb/db/query/expression/Expression.java      |  2 +
 .../query/expression/binary/BinaryExpression.java  | 13 +++---
 .../query/expression/unary/FunctionExpression.java | 16 ++++++-
 .../query/expression/unary/NegationExpression.java |  4 ++
 .../query/expression/unary/TimeSeriesOperand.java  |  3 +-
 .../api/customizer/parameter/UDFParameters.java    | 49 +++++++++++-----------
 .../db/query/udf/core/executor/UDTFExecutor.java   | 12 ++++--
 .../iotdb/db/query/udf/core/layer/DAGBuilder.java  |  4 ++
 11 files changed, 86 insertions(+), 71 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDFPlan.java
index f1ed19f..a60640b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDFPlan.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.qp.physical.crud;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.expression.ResultColumn;
 
 import java.util.List;
@@ -39,10 +38,6 @@ public interface UDFPlan {
    */
   void constructUdfExecutors(List<ResultColumn> resultColumns);
 
-  /** Allocate computing resources, create UDF instances, and call UDF initialization methods. */
-  void initializeUdfExecutors(long queryId, float collectorMemoryBudgetInMb)
-      throws QueryProcessException;
-
   /** Call UDF finalization methods and release computing resources. */
   void finalizeUDFExecutors(long queryId);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index b7a1a75..38c33cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.qp.physical.crud;
 
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
@@ -32,14 +31,12 @@ import org.apache.iotdb.db.query.expression.unary.NegationExpression;
 import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
-import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -123,24 +120,6 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
   }
 
   @Override
-  public void initializeUdfExecutors(long queryId, float collectorMemoryBudgetInMB)
-      throws QueryProcessException {
-    Collection<UDTFExecutor> executors = expressionName2Executor.values();
-    collectorMemoryBudgetInMB /= executors.size();
-
-    UDFRegistrationService.getInstance().acquireRegistrationLock();
-    // This statement must be surrounded by the registration lock.
-    UDFClassLoaderManager.getInstance().initializeUDFQuery(queryId);
-    try {
-      for (UDTFExecutor executor : executors) {
-        executor.beforeStart(queryId, collectorMemoryBudgetInMB);
-      }
-    } finally {
-      UDFRegistrationService.getInstance().releaseRegistrationLock();
-    }
-  }
-
-  @Override
   public void finalizeUDFExecutors(long queryId) {
     try {
       for (UDTFExecutor executor : expressionName2Executor.values()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index f58a8de..09d41e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.query.udf.core.layer.DAGBuilder;
 import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
+import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
@@ -74,7 +76,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
             timestampGenerator,
             readersOfSelectedSeries,
             cached);
-    udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
+
     initTransformers();
   }
 
@@ -96,16 +98,28 @@ public abstract class UDTFDataSet extends QueryDataSet {
             deduplicatedPaths,
             deduplicatedDataTypes,
             readersOfSelectedSeries);
-    udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
+
     initTransformers();
   }
 
   protected void initTransformers() throws QueryProcessException, IOException {
-    transformers =
-        new DAGBuilder(queryId, udtfPlan, rawQueryInputLayer, UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB)
-            .buildLayerMemoryAssigner()
-            .buildResultColumnPointReaders()
-            .getResultColumnPointReaders();
+    UDFRegistrationService.getInstance().acquireRegistrationLock();
+    // This statement must be surrounded by the registration lock.
+    UDFClassLoaderManager.getInstance().initializeUDFQuery(queryId);
+    try {
+      // UDF executors will be initialized at the same time
+      transformers =
+          new DAGBuilder(
+                  queryId,
+                  udtfPlan,
+                  rawQueryInputLayer,
+                  UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB)
+              .buildLayerMemoryAssigner()
+              .buildResultColumnPointReaders()
+              .getResultColumnPointReaders();
+    } finally {
+      UDFRegistrationService.getInstance().releaseRegistrationLock();
+    }
   }
 
   public void finalizeUDFs(long queryId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index cea4875..25ca6ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
 import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.io.IOException;
 import java.time.ZoneId;
@@ -65,6 +66,7 @@ public abstract class Expression {
       UDTFPlan udtfPlan,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      Map<Expression, TSDataType> expressionDataTypeMap,
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 99cfffc..9058adf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReference
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
 import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.io.IOException;
 import java.time.ZoneId;
@@ -136,6 +137,7 @@ public abstract class BinaryExpression extends Expression {
       UDTFPlan udtfPlan,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      Map<Expression, TSDataType> expressionDataTypeMap,
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
@@ -147,6 +149,7 @@ public abstract class BinaryExpression extends Expression {
               udtfPlan,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
+              expressionDataTypeMap,
               memoryAssigner);
       IntermediateLayer rightParentIntermediateLayer =
           rightExpression.constructIntermediateLayer(
@@ -154,11 +157,13 @@ public abstract class BinaryExpression extends Expression {
               udtfPlan,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
+              expressionDataTypeMap,
               memoryAssigner);
       Transformer transformer =
           constructTransformer(
               leftParentIntermediateLayer.constructPointReader(),
               rightParentIntermediateLayer.constructPointReader());
+      expressionDataTypeMap.put(this, transformer.getDataType());
 
       expressionIntermediateLayerMap.put(
           this,
@@ -175,14 +180,6 @@ public abstract class BinaryExpression extends Expression {
   protected abstract ArithmeticBinaryTransformer constructTransformer(
       LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader);
 
-  public Expression getLeftExpression() {
-    return leftExpression;
-  }
-
-  public Expression getRightExpression() {
-    return rightExpression;
-  }
-
   @Override
   public final String toString() {
     return String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index adb703b..f720182 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
 import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowTransformer;
 import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowWindowTransformer;
 import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryTransformer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.io.IOException;
 import java.time.ZoneId;
@@ -69,6 +70,8 @@ public class FunctionExpression extends Expression {
    */
   private List<Expression> expressions;
 
+  private List<TSDataType> inputExpressionTypes;
+
   private List<PartialPath> paths;
 
   private String parametersString;
@@ -187,6 +190,7 @@ public class FunctionExpression extends Expression {
       UDTFPlan udtfPlan,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      Map<Expression, TSDataType> expressionDataTypeMap,
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
@@ -198,9 +202,12 @@ public class FunctionExpression extends Expression {
               udtfPlan,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
+              expressionDataTypeMap,
               memoryAssigner);
       Transformer transformer =
-          constructUdfTransformer(udtfPlan, memoryAssigner, udfInputIntermediateLayer);
+          constructUdfTransformer(
+              queryId, udtfPlan, expressionDataTypeMap, memoryAssigner, udfInputIntermediateLayer);
+      expressionDataTypeMap.put(this, transformer.getDataType());
 
       expressionIntermediateLayerMap.put(
           this,
@@ -219,6 +226,7 @@ public class FunctionExpression extends Expression {
       UDTFPlan udtfPlan,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      Map<Expression, TSDataType> expressionDataTypeMap,
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException {
     List<IntermediateLayer> intermediateLayers = new ArrayList<>();
@@ -229,6 +237,7 @@ public class FunctionExpression extends Expression {
               udtfPlan,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
+              expressionDataTypeMap,
               memoryAssigner));
     }
     return intermediateLayers.size() == 1
@@ -243,11 +252,16 @@ public class FunctionExpression extends Expression {
   }
 
   private UDFQueryTransformer constructUdfTransformer(
+      long queryId,
       UDTFPlan udtfPlan,
+      Map<Expression, TSDataType> expressionDataTypeMap,
       LayerMemoryAssigner memoryAssigner,
       IntermediateLayer udfInputIntermediateLayer)
       throws QueryProcessException, IOException {
     UDTFExecutor executor = udtfPlan.getExecutorByFunctionExpression(this);
+
+    executor.beforeStart(queryId, memoryAssigner.assign(), expressionDataTypeMap);
+
     AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
     switch (accessStrategy.getAccessStrategyType()) {
       case ROW_BY_ROW:
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 0e33cd8..a380852 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceI
 import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
 import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.io.IOException;
 import java.time.ZoneId;
@@ -100,6 +101,7 @@ public class NegationExpression extends Expression {
       UDTFPlan udtfPlan,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      Map<Expression, TSDataType> expressionDataTypeMap,
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
@@ -111,9 +113,11 @@ public class NegationExpression extends Expression {
               udtfPlan,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
+              expressionDataTypeMap,
               memoryAssigner);
       Transformer transformer =
           new ArithmeticNegationTransformer(parentLayerPointReader.constructPointReader());
+      expressionDataTypeMap.put(this, transformer.getDataType());
 
       expressionIntermediateLayerMap.put(
           this,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 55a79b8..b01f608 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -42,7 +42,6 @@ import java.util.Set;
 public class TimeSeriesOperand extends Expression {
 
   protected PartialPath path;
-  protected TSDataType dataType;
 
   public TimeSeriesOperand(PartialPath path) {
     this.path = path;
@@ -93,6 +92,7 @@ public class TimeSeriesOperand extends Expression {
       UDTFPlan udtfPlan,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      Map<Expression, TSDataType> expressionDataTypeMap,
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
@@ -100,6 +100,7 @@ public class TimeSeriesOperand extends Expression {
 
       LayerPointReader parentLayerPointReader =
           rawTimeSeriesInputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath()));
+      expressionDataTypeMap.put(this, parentLayerPointReader.getDataType());
 
       expressionIntermediateLayerMap.put(
           this,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/api/customizer/parameter/UDFParameters.java b/server/src/main/java/org/apache/iotdb/db/query/udf/api/customizer/parameter/UDFParameters.java
index 148fa6d..e84ad24 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/api/customizer/parameter/UDFParameters.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/api/customizer/parameter/UDFParameters.java
@@ -20,12 +20,15 @@
 package org.apache.iotdb.db.query.udf.api.customizer.parameter;
 
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.db.query.udf.api.UDTF;
 import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
-import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -43,51 +46,47 @@ import java.util.Map;
  */
 public class UDFParameters {
 
+  private final List<Expression> expressions;
   private final List<PartialPath> paths;
   private final Map<String, String> attributes;
+  private final List<TSDataType> dataTypes;
+
+  public UDFParameters(
+      FunctionExpression functionExpression, Map<Expression, TSDataType> expressionDataTypeMap)
+      throws QueryProcessException {
+    expressions = functionExpression.getExpressions();
+    paths = functionExpression.getPaths();
+    attributes = functionExpression.getFunctionAttributes();
+    dataTypes = new ArrayList<>();
+    for (Expression expression : expressions) {
+      dataTypes.add(expressionDataTypeMap.get(expression));
+    }
+  }
 
-  private List<TSDataType> dataTypes;
-
-  public UDFParameters(List<PartialPath> paths, Map<String, String> attributes) {
-    this.paths = paths;
-    this.attributes = attributes;
-    dataTypes = null;
+  public List<Expression> getExpressions() {
+    return expressions;
   }
 
   public List<PartialPath> getPaths() {
     return paths;
   }
 
-  public List<TSDataType> getDataTypes() throws MetadataException {
-    if (dataTypes == null) {
-      dataTypes = SchemaUtils.getSeriesTypesByPaths(paths);
-    }
-    return dataTypes;
-  }
-
   public Map<String, String> getAttributes() {
     return attributes;
   }
 
+  public List<TSDataType> getDataTypes() throws MetadataException {
+    return dataTypes;
+  }
+
   public PartialPath getPath(int index) {
     return paths.get(index);
   }
 
   public TSDataType getDataType(int index) throws MetadataException {
-    if (dataTypes == null) {
-      dataTypes = SchemaUtils.getSeriesTypesByPaths(paths);
-    }
     return dataTypes.get(index);
   }
 
-  public TSDataType getDataType(String path) throws MetadataException {
-    return SchemaUtils.getSeriesTypeByPath(new PartialPath(path));
-  }
-
-  public TSDataType getDataType(PartialPath path) throws MetadataException {
-    return SchemaUtils.getSeriesTypeByPath(path);
-  }
-
   public boolean hasAttribute(String attributeKey) {
     return attributes.containsKey(attributeKey);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
index 59b0a2d..48b94f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.query.udf.core.executor;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.db.query.udf.api.UDTF;
 import org.apache.iotdb.db.query.udf.api.access.Row;
@@ -29,13 +30,16 @@ import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValida
 import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
 import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
 import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.time.ZoneId;
+import java.util.Map;
 
 public class UDTFExecutor {
 
   protected final FunctionExpression expression;
   protected final UDTFConfigurations configurations;
+
   protected UDTF udtf;
   protected ElasticSerializableTVList collector;
 
@@ -44,12 +48,14 @@ public class UDTFExecutor {
     configurations = new UDTFConfigurations(zoneId);
   }
 
-  public void beforeStart(long queryId, float collectorMemoryBudgetInMB)
+  public void beforeStart(
+      long queryId,
+      float collectorMemoryBudgetInMB,
+      Map<Expression, TSDataType> expressionDataTypeMap)
       throws QueryProcessException {
     udtf = (UDTF) UDFRegistrationService.getInstance().reflect(expression);
 
-    UDFParameters parameters =
-        new UDFParameters(expression.getPaths(), expression.getFunctionAttributes());
+    UDFParameters parameters = new UDFParameters(expression, expressionDataTypeMap);
 
     try {
       udtf.validate(new UDFParameterValidator(parameters));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index 1a33c17..a146826 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -46,6 +47,7 @@ public class DAGBuilder {
   // sub-expressions, but they can share the same point reader. we cache the point reader here to
   // make sure that only one point reader will be built for one expression.
   private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
+  private final Map<Expression, TSDataType> expressionDataTypeMap;
 
   public DAGBuilder(
       long queryId, UDTFPlan udtfPlan, RawQueryInputLayer inputLayer, float memoryBudgetInMB) {
@@ -63,6 +65,7 @@ public class DAGBuilder {
     memoryAssigner = new LayerMemoryAssigner(memoryBudgetInMB);
 
     expressionIntermediateLayerMap = new HashMap<>();
+    expressionDataTypeMap = new HashMap<>();
   }
 
   public DAGBuilder buildLayerMemoryAssigner() {
@@ -82,6 +85,7 @@ public class DAGBuilder {
                   udtfPlan,
                   rawTimeSeriesInputLayer,
                   expressionIntermediateLayerMap,
+                  expressionDataTypeMap,
                   memoryAssigner)
               .constructPointReader();
     }