You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/12 13:01:19 UTC

[iotdb] 03/03: bind expr with input column index in another way

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch udf-operator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 14491af4dfe18a4118f184cbef41f198b757f103
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Apr 12 21:00:36 2022 +0800

    bind expr with input column index in another way
---
 .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 37 ++++---------
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java |  3 +-
 .../iotdb/db/query/expression/Expression.java      | 15 ++----
 .../query/expression/binary/BinaryExpression.java  | 14 +++--
 .../db/query/expression/unary/ConstantOperand.java |  8 ++-
 .../query/expression/unary/FunctionExpression.java | 26 ++++++----
 .../query/expression/unary/LogicNotExpression.java | 17 ++++--
 .../query/expression/unary/NegationExpression.java | 11 +++-
 .../query/expression/unary/TimeSeriesOperand.java  | 10 +++-
 .../db/query/udf/core/executor/UDTFContext.java    | 60 ++++++++++++++++++++++
 .../iotdb/db/query/udf/core/layer/DAGBuilder.java  |  9 +++-
 .../query/udf/core/layer/RawQueryInputLayer.java   |  4 ++
 12 files changed, 156 insertions(+), 58 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index 976bfec456..e1c9dbadb7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -25,9 +25,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 import org.apache.iotdb.db.query.expression.ResultColumn;
-import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
-import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
-import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -43,15 +41,14 @@ import java.util.Set;
 
 public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
 
-  protected final ZoneId zoneId;
+  protected final UDTFContext udtfContext;
 
-  protected Map<String, UDTFExecutor> expressionName2Executor = new HashMap<>();
-  protected Map<Integer, Integer> datasetOutputIndexToResultColumnIndex = new HashMap<>();
-  protected Map<String, Integer> pathNameToReaderIndex = new HashMap<>();
+  protected final Map<Integer, Integer> datasetOutputIndexToResultColumnIndex = new HashMap<>();
+  protected final Map<String, Integer> pathNameToReaderIndex = new HashMap<>();
 
   public UDTFPlan(ZoneId zoneId) {
     super();
-    this.zoneId = zoneId;
+    udtfContext = new UDTFContext(zoneId);
     setOperatorType(Operator.OperatorType.UDTF);
   }
 
@@ -128,35 +125,23 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
 
   @Override
   public void constructUdfExecutors(List<ResultColumn> resultColumns) {
-    for (ResultColumn resultColumn : resultColumns) {
-      resultColumn.getExpression().constructUdfExecutors(expressionName2Executor, zoneId);
-    }
+    udtfContext.constructUdfExecutors(resultColumns);
   }
 
   @Override
   public void finalizeUDFExecutors(long queryId) {
-    try {
-      for (UDTFExecutor executor : expressionName2Executor.values()) {
-        executor.beforeDestroy();
-      }
-    } finally {
-      UDFClassLoaderManager.getInstance().finalizeUDFQuery(queryId);
-    }
+    udtfContext.finalizeUDFExecutors(queryId);
   }
 
   public ResultColumn getResultColumnByDatasetOutputIndex(int datasetOutputIndex) {
     return resultColumns.get(datasetOutputIndexToResultColumnIndex.get(datasetOutputIndex));
   }
 
-  public UDTFExecutor getExecutorByFunctionExpression(FunctionExpression functionExpression) {
-    return expressionName2Executor.get(functionExpression.getExpressionString());
-  }
-
-  public int getReaderIndex(String pathName) {
-    return pathNameToReaderIndex.get(pathName);
+  public Integer getReaderIndexByExpressionName(String expressionName) {
+    return pathNameToReaderIndex.get(expressionName);
   }
 
-  public int getReaderIndexByExpressionName(String expressionName) {
-    return pathNameToReaderIndex.get(expressionName);
+  public UDTFContext getUdtfContext() {
+    return udtfContext;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 1fe27e7c14..c299a355f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -102,7 +102,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
     initDataSetFields();
   }
 
-  protected UDTFDataSet(QueryContext queryContext, UDTFPlan udtfPlan, IUDFInputDataSet dataSet)
+  public UDTFDataSet(QueryContext queryContext, UDTFPlan udtfPlan, IUDFInputDataSet dataSet)
       throws QueryProcessException, IOException {
     queryId = queryContext.getQueryId();
     this.udtfPlan = udtfPlan;
@@ -123,6 +123,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
                   udtfPlan,
                   rawQueryInputLayer,
                   UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB)
+              .bindInputLayerColumnIndexWithExpression()
               .buildLayerMemoryAssigner()
               .buildResultColumnPointReaders()
               .setDataSetResultColumnDataTypes()
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 619167bf96..48f231c8fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.unary.ConstantOperand;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -49,7 +50,7 @@ public abstract class Expression {
 
   protected Boolean isConstantOperandCache = null;
 
-  protected Integer tsBlockInputColumnIndex = null;
+  protected Integer inputColumnIndex = null;
 
   public boolean isBuiltInAggregationFunctionExpression() {
     return false;
@@ -63,14 +64,6 @@ public abstract class Expression {
     return false;
   }
 
-  public Integer getTsBlockInputColumnIndex() {
-    return tsBlockInputColumnIndex;
-  }
-
-  public void setTsBlockInputColumnIndex(Integer tsBlockInputColumnIndex) {
-    this.tsBlockInputColumnIndex = tsBlockInputColumnIndex;
-  }
-
   public abstract void concat(
       List<PartialPath> prefixPaths,
       List<Expression> resultExpressions,
@@ -94,11 +87,13 @@ public abstract class Expression {
   public abstract void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId);
 
+  public abstract void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan);
+
   public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner);
 
   public abstract IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index b706a52f5d..18eca2151b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -205,6 +206,13 @@ public abstract class BinaryExpression extends Expression {
     rightExpression.constructUdfExecutors(expressionName2Executor, zoneId);
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+    leftExpression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+    rightExpression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+    inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     leftExpression.updateStatisticsForMemoryAssigner(memoryAssigner);
@@ -215,7 +223,7 @@ public abstract class BinaryExpression extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
@@ -227,7 +235,7 @@ public abstract class BinaryExpression extends Expression {
       IntermediateLayer leftParentIntermediateLayer =
           leftExpression.constructIntermediateLayer(
               queryId,
-              udtfPlan,
+              udtfContext,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
               expressionDataTypeMap,
@@ -235,7 +243,7 @@ public abstract class BinaryExpression extends Expression {
       IntermediateLayer rightParentIntermediateLayer =
           rightExpression.constructIntermediateLayer(
               queryId,
-              udtfPlan,
+              udtfContext,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
               expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index 0d32cfbad5..e8e0088599 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.ConstantIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
@@ -102,6 +103,11 @@ public class ConstantOperand extends Expression {
     // Do nothing
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+    // Do nothing
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     // Do nothing
@@ -110,7 +116,7 @@ public class ConstantOperand extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 564ce5bb09..3683ff50fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -246,6 +247,14 @@ public class FunctionExpression extends Expression {
     expressionName2Executor.put(expressionString, new UDTFExecutor(this, zoneId));
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+    for (Expression expression : expressions) {
+      expression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+    }
+    inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     for (Expression expression : expressions) {
@@ -257,7 +266,7 @@ public class FunctionExpression extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
@@ -269,13 +278,12 @@ public class FunctionExpression extends Expression {
       if (isBuiltInAggregationFunctionExpression) {
         transformer =
             new TransparentTransformer(
-                rawTimeSeriesInputLayer.constructPointReader(
-                    udtfPlan.getReaderIndexByExpressionName(toString())));
+                rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex));
       } else {
         IntermediateLayer udfInputIntermediateLayer =
             constructUdfInputIntermediateLayer(
                 queryId,
-                udtfPlan,
+                udtfContext,
                 rawTimeSeriesInputLayer,
                 expressionIntermediateLayerMap,
                 expressionDataTypeMap,
@@ -283,7 +291,7 @@ public class FunctionExpression extends Expression {
         transformer =
             constructUdfTransformer(
                 queryId,
-                udtfPlan,
+                udtfContext,
                 expressionDataTypeMap,
                 memoryAssigner,
                 udfInputIntermediateLayer);
@@ -303,7 +311,7 @@ public class FunctionExpression extends Expression {
 
   private IntermediateLayer constructUdfInputIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
@@ -314,7 +322,7 @@ public class FunctionExpression extends Expression {
       intermediateLayers.add(
           expression.constructIntermediateLayer(
               queryId,
-              udtfPlan,
+              udtfContext,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
               expressionDataTypeMap,
@@ -333,12 +341,12 @@ public class FunctionExpression extends Expression {
 
   private UDFQueryTransformer constructUdfTransformer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       Map<Expression, TSDataType> expressionDataTypeMap,
       LayerMemoryAssigner memoryAssigner,
       IntermediateLayer udfInputIntermediateLayer)
       throws QueryProcessException, IOException {
-    UDTFExecutor executor = udtfPlan.getExecutorByFunctionExpression(this);
+    UDTFExecutor executor = udtfContext.getExecutorByFunctionExpression(this);
 
     executor.beforeStart(queryId, memoryAssigner.assign(), expressionDataTypeMap);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
index 66893a0172..33fb9cb5a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -39,7 +40,11 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.io.IOException;
 import java.time.ZoneId;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class LogicNotExpression extends Expression {
   protected Expression expression;
@@ -127,6 +132,12 @@ public class LogicNotExpression extends Expression {
     expression.constructUdfExecutors(expressionName2Executor, zoneId);
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+    expression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+    inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     expression.updateStatisticsForMemoryAssigner(memoryAssigner);
@@ -136,7 +147,7 @@ public class LogicNotExpression extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
@@ -148,7 +159,7 @@ public class LogicNotExpression extends Expression {
       IntermediateLayer parentLayerPointReader =
           expression.constructIntermediateLayer(
               queryId,
-              udtfPlan,
+              udtfContext,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
               expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 04065daa55..5b8dccf1ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -132,6 +133,12 @@ public class NegationExpression extends Expression {
     expression.constructUdfExecutors(expressionName2Executor, zoneId);
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+    expression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+    inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     expression.updateStatisticsForMemoryAssigner(memoryAssigner);
@@ -141,7 +148,7 @@ public class NegationExpression extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
@@ -153,7 +160,7 @@ public class NegationExpression extends Expression {
       IntermediateLayer parentLayerPointReader =
           expression.constructIntermediateLayer(
               queryId,
-              udtfPlan,
+              udtfContext,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
               expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index a9c1052c01..adcb7dff6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -116,6 +117,11 @@ public class TimeSeriesOperand extends Expression {
     // nothing to do
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+    inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     memoryAssigner.increaseExpressionReference(this);
@@ -124,7 +130,7 @@ public class TimeSeriesOperand extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
@@ -134,7 +140,7 @@ public class TimeSeriesOperand extends Expression {
       float memoryBudgetInMB = memoryAssigner.assign();
 
       LayerPointReader parentLayerPointReader =
-          rawTimeSeriesInputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath()));
+          rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex);
       expressionDataTypeMap.put(this, parentLayerPointReader.getDataType());
 
       expressionIntermediateLayerMap.put(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java
new file mode 100644
index 0000000000..cb7d467403
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.executor;
+
+import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class UDTFContext {
+
+  protected final ZoneId zoneId;
+
+  protected Map<String, UDTFExecutor> expressionName2Executor = new HashMap<>();
+
+  public UDTFContext(ZoneId zoneId) {
+    this.zoneId = zoneId;
+  }
+
+  public void constructUdfExecutors(List<ResultColumn> resultColumns) {
+    for (ResultColumn resultColumn : resultColumns) {
+      resultColumn.getExpression().constructUdfExecutors(expressionName2Executor, zoneId);
+    }
+  }
+
+  public void finalizeUDFExecutors(long queryId) {
+    try {
+      for (UDTFExecutor executor : expressionName2Executor.values()) {
+        executor.beforeDestroy();
+      }
+    } finally {
+      UDFClassLoaderManager.getInstance().finalizeUDFQuery(queryId);
+    }
+  }
+
+  public UDTFExecutor getExecutorByFunctionExpression(FunctionExpression functionExpression) {
+    return expressionName2Executor.get(functionExpression.getExpressionString());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index 184753489d..270482b1e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -69,6 +69,13 @@ public class DAGBuilder {
     expressionDataTypeMap = new HashMap<>();
   }
 
+  public DAGBuilder bindInputLayerColumnIndexWithExpression() {
+    for (Expression expression : resultColumnExpressions) {
+      expression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+    }
+    return this;
+  }
+
   public DAGBuilder buildLayerMemoryAssigner() {
     for (Expression expression : resultColumnExpressions) {
       expression.updateStatisticsForMemoryAssigner(memoryAssigner);
@@ -83,7 +90,7 @@ public class DAGBuilder {
           resultColumnExpressions[i]
               .constructIntermediateLayer(
                   queryId,
-                  udtfPlan,
+                  udtfPlan.getUdtfContext(),
                   rawTimeSeriesInputLayer,
                   expressionIntermediateLayerMap,
                   expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
index d462da438c..fec8fd71b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
@@ -94,6 +94,10 @@ public class RawQueryInputLayer {
     rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine());
   }
 
+  public int getInputColumnCount() {
+    return dataTypes.length;
+  }
+
   public LayerPointReader constructPointReader(int columnIndex) {
     return new InputLayerPointReader(columnIndex);
   }