You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/12 13:01:16 UTC

[iotdb] branch udf-operator created (now 14491af4df)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch udf-operator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 14491af4df bind expr with input column index in another way

This branch includes the following new commits:

     new 9a7199c8b1 Expression: tsBlockInputColumnIndex
     new 2e7d195563 TsBlockInputDataSet
     new 14491af4df bind expr with input column index in another way

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/03: TsBlockInputDataSet

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch udf-operator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2e7d195563dde8bbffe68bd44e654cf603958cef
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Apr 12 19:58:59 2022 +0800

    TsBlockInputDataSet
---
 .../db/mpp/operator/source/SeriesScanUtil.java     | 14 ++--
 .../query/udf/core/layer/TsBlockInputDataSet.java  | 64 ++++++++++++++++++
 .../iotdb/tsfile/read/common/block/TsBlock.java    | 78 +++++++++++++++++-----
 .../read/common/block/column/BinaryColumn.java     |  5 ++
 .../read/common/block/column/BooleanColumn.java    |  5 ++
 .../tsfile/read/common/block/column/Column.java    |  5 ++
 .../read/common/block/column/DoubleColumn.java     |  5 ++
 .../read/common/block/column/FloatColumn.java      |  5 ++
 .../tsfile/read/common/block/column/IntColumn.java |  5 ++
 .../read/common/block/column/LongColumn.java       |  5 ++
 .../block/column/RunLengthEncodedColumn.java       |  6 ++
 .../read/common/block/column/TimeColumn.java       |  5 ++
 12 files changed, 182 insertions(+), 20 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
index 80e0a67771..f65947a45d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
@@ -47,7 +47,13 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
 
@@ -705,7 +711,7 @@ public class SeriesScanUtil {
               mergeReader.addReader(
                   firstPageReader
                       .getAllSatisfiedPageData(orderUtils.getAscending())
-                      .getTsBlockIterator(),
+                      .getTsBlockColumnIterator(),
                   firstPageReader.version,
                   orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
                   context);
@@ -732,7 +738,7 @@ public class SeriesScanUtil {
               mergeReader.addReader(
                   pageReader
                       .getAllSatisfiedPageData(orderUtils.getAscending())
-                      .getTsBlockIterator(),
+                      .getTsBlockColumnIterator(),
                   pageReader.version,
                   orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
                   context);
@@ -913,7 +919,7 @@ public class SeriesScanUtil {
 
   private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException {
     mergeReader.addReader(
-        pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getTsBlockIterator(),
+        pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getTsBlockColumnIterator(),
         pageReader.version,
         orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
         context);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/TsBlockInputDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/TsBlockInputDataSet.java
new file mode 100644
index 0000000000..a12e5a0cab
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/TsBlockInputDataSet.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.query.dataset.IUDFInputDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockRowIterator;
+
+import java.util.List;
+
+public class TsBlockInputDataSet implements IUDFInputDataSet {
+
+  private final Operator operator;
+  private final List<TSDataType> dataTypes;
+
+  private TsBlockRowIterator tsBlockRowIterator;
+
+  public TsBlockInputDataSet(Operator operator, List<TSDataType> dataTypes) {
+    this.operator = operator;
+    this.dataTypes = dataTypes;
+  }
+
+  @Override
+  public List<TSDataType> getDataTypes() {
+    return dataTypes;
+  }
+
+  @Override
+  public boolean hasNextRowInObjects() {
+    if (tsBlockRowIterator != null && tsBlockRowIterator.hasNext()) {
+      return true;
+    }
+
+    if (!operator.hasNext()) {
+      return false;
+    }
+
+    tsBlockRowIterator = operator.next().getTsBlockRowIterator();
+    return true;
+  }
+
+  @Override
+  public Object[] nextRowInObjects() {
+    return tsBlockRowIterator.next();
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 29a17a1a79..261c6337d7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.openjdk.jol.info.ClassLayout;
 
 import java.util.Arrays;
+import java.util.Iterator;
 
 import static io.airlift.slice.SizeOf.sizeOf;
 import static java.lang.String.format;
@@ -173,8 +174,16 @@ public class TsBlock {
     return valueColumns[columnIndex];
   }
 
-  public TsBlockIterator getTsBlockIterator() {
-    return new TsBlockIterator(0);
+  public TsBlockColumnIterator getTsBlockColumnIterator() {
+    return new TsBlockColumnIterator(0);
+  }
+
+  public TsBlockColumnIterator getTsBlockColumnIterator(int columnIndex) {
+    return new TsBlockColumnIterator(0, columnIndex);
+  }
+
+  public TsBlockRowIterator getTsBlockRowIterator() {
+    return new TsBlockRowIterator(0);
   }
 
   /** Only used for the batch data of vector time series. */
@@ -182,17 +191,24 @@ public class TsBlock {
     return new AlignedTsBlockIterator(0, subIndex);
   }
 
-  private class TsBlockIterator implements IPointReader, IBatchDataIterator {
+  private class TsBlockColumnIterator implements IPointReader, IBatchDataIterator {
+
+    protected int rowIndex;
+    protected int columnIndex;
 
-    protected int index;
+    public TsBlockColumnIterator(int rowIndex) {
+      this.rowIndex = rowIndex;
+      this.columnIndex = 0;
+    }
 
-    public TsBlockIterator(int index) {
-      this.index = index;
+    public TsBlockColumnIterator(int rowIndex, int columnIndex) {
+      this.rowIndex = rowIndex;
+      this.columnIndex = columnIndex;
     }
 
     @Override
     public boolean hasNext() {
-      return index < positionCount;
+      return rowIndex < positionCount;
     }
 
     @Override
@@ -202,22 +218,22 @@ public class TsBlock {
 
     @Override
     public void next() {
-      index++;
+      rowIndex++;
     }
 
     @Override
     public long currentTime() {
-      return timeColumn.getLong(index);
+      return timeColumn.getLong(rowIndex);
     }
 
     @Override
     public Object currentValue() {
-      return valueColumns[0].getTsPrimitiveType(index).getValue();
+      return valueColumns[columnIndex].getTsPrimitiveType(rowIndex).getValue();
     }
 
     @Override
     public void reset() {
-      index = 0;
+      rowIndex = 0;
     }
 
     @Override
@@ -240,14 +256,44 @@ public class TsBlock {
     @Override
     public TimeValuePair currentTimeValuePair() {
       return new TimeValuePair(
-          timeColumn.getLong(index), valueColumns[0].getTsPrimitiveType(index));
+          timeColumn.getLong(rowIndex), valueColumns[columnIndex].getTsPrimitiveType(rowIndex));
     }
 
     @Override
     public void close() {}
   }
 
-  private class AlignedTsBlockIterator extends TsBlockIterator {
+  public class TsBlockRowIterator implements Iterator<Object[]> {
+
+    protected int rowIndex;
+    protected int columnCount;
+
+    public TsBlockRowIterator(int rowIndex) {
+      this.rowIndex = rowIndex;
+      columnCount = getValueColumnCount();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return rowIndex < positionCount;
+    }
+
+    @Override
+    public Object[] next() {
+      int columnCount = getValueColumnCount();
+      Object[] row = new Object[columnCount + 1];
+      for (int i = 0; i < columnCount; ++i) {
+        row[i] = valueColumns[i].getObject(rowIndex);
+      }
+      row[columnCount] = timeColumn.getObject(rowIndex);
+
+      rowIndex++;
+
+      return row;
+    }
+  }
+
+  private class AlignedTsBlockIterator extends TsBlockColumnIterator {
 
     private final int subIndex;
 
@@ -277,7 +323,7 @@ public class TsBlock {
 
     @Override
     public Object currentValue() {
-      TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(index);
+      TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(rowIndex);
       return v == null ? null : v.getValue();
     }
 
@@ -286,12 +332,12 @@ public class TsBlock {
       // aligned timeseries' BatchData length() may return the length of time column
       // we need traverse to VectorBatchDataIterator calculate the actual value column's length
       int cnt = 0;
-      int indexSave = index;
+      int indexSave = rowIndex;
       while (hasNext()) {
         cnt++;
         next();
       }
-      index = indexSave;
+      rowIndex = indexSave;
       return cnt;
     }
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
index 8828393a68..9d95c4edcc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
@@ -74,6 +74,11 @@ public class BinaryColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getBinary(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
index 54544d3650..66ef186690 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
@@ -73,6 +73,11 @@ public class BooleanColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getBoolean(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
index adc06a1f52..67e5d2f601 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
@@ -53,6 +53,11 @@ public interface Column {
     throw new UnsupportedOperationException(getClass().getName());
   }
 
+  /** Gets an Object at {@code position}. */
+  default Object getObject(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
   /** Gets a TsPrimitiveType at {@code position}. */
   default TsPrimitiveType getTsPrimitiveType(int position) {
     throw new UnsupportedOperationException(getClass().getName());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
index 32809b02f6..7abfa05a8e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
@@ -73,6 +73,11 @@ public class DoubleColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getDouble(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
index 51a2675dae..25bbe44fdf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
@@ -72,6 +72,11 @@ public class FloatColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getFloat(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
index 0d48dd2133..49d3357c1f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
@@ -72,6 +72,11 @@ public class IntColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getInt(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
index 345e71d5bc..e3838d77e7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
@@ -72,6 +72,11 @@ public class LongColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getLong(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
index 39775002dd..e60d7ebc66 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -97,6 +97,12 @@ public class RunLengthEncodedColumn implements Column {
     return value.getBinary(position);
   }
 
+  @Override
+  public Object getObject(int position) {
+    checkReadablePosition(position);
+    return value.getObject(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
index c46fbd2ea3..e80b670f84 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -62,6 +62,11 @@ public class TimeColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getLong(position);
+  }
+
   @Override
   public boolean isNull(int position) {
     return false;


[iotdb] 01/03: Expression: tsBlockInputColumnIndex

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch udf-operator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9a7199c8b18561a35c982c45c0d78a27fcaff0f5
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Apr 12 10:20:49 2022 +0800

    Expression: tsBlockInputColumnIndex
---
 .../java/org/apache/iotdb/db/query/expression/Expression.java | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index fc6a1d6e0c..619167bf96 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -46,8 +46,11 @@ import java.util.Set;
 public abstract class Expression {
 
   private String expressionStringCache;
+
   protected Boolean isConstantOperandCache = null;
 
+  protected Integer tsBlockInputColumnIndex = null;
+
   public boolean isBuiltInAggregationFunctionExpression() {
     return false;
   }
@@ -60,6 +63,14 @@ public abstract class Expression {
     return false;
   }
 
+  public Integer getTsBlockInputColumnIndex() {
+    return tsBlockInputColumnIndex;
+  }
+
+  public void setTsBlockInputColumnIndex(Integer tsBlockInputColumnIndex) {
+    this.tsBlockInputColumnIndex = tsBlockInputColumnIndex;
+  }
+
   public abstract void concat(
       List<PartialPath> prefixPaths,
       List<Expression> resultExpressions,


[iotdb] 03/03: bind expr with input column index in another way

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch udf-operator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 14491af4dfe18a4118f184cbef41f198b757f103
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Apr 12 21:00:36 2022 +0800

    bind expr with input column index in another way
---
 .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 37 ++++---------
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java |  3 +-
 .../iotdb/db/query/expression/Expression.java      | 15 ++----
 .../query/expression/binary/BinaryExpression.java  | 14 +++--
 .../db/query/expression/unary/ConstantOperand.java |  8 ++-
 .../query/expression/unary/FunctionExpression.java | 26 ++++++----
 .../query/expression/unary/LogicNotExpression.java | 17 ++++--
 .../query/expression/unary/NegationExpression.java | 11 +++-
 .../query/expression/unary/TimeSeriesOperand.java  | 10 +++-
 .../db/query/udf/core/executor/UDTFContext.java    | 60 ++++++++++++++++++++++
 .../iotdb/db/query/udf/core/layer/DAGBuilder.java  |  9 +++-
 .../query/udf/core/layer/RawQueryInputLayer.java   |  4 ++
 12 files changed, 156 insertions(+), 58 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index 976bfec456..e1c9dbadb7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -25,9 +25,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 import org.apache.iotdb.db.query.expression.ResultColumn;
-import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
-import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
-import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -43,15 +41,14 @@ import java.util.Set;
 
 public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
 
-  protected final ZoneId zoneId;
+  protected final UDTFContext udtfContext;
 
-  protected Map<String, UDTFExecutor> expressionName2Executor = new HashMap<>();
-  protected Map<Integer, Integer> datasetOutputIndexToResultColumnIndex = new HashMap<>();
-  protected Map<String, Integer> pathNameToReaderIndex = new HashMap<>();
+  protected final Map<Integer, Integer> datasetOutputIndexToResultColumnIndex = new HashMap<>();
+  protected final Map<String, Integer> pathNameToReaderIndex = new HashMap<>();
 
   public UDTFPlan(ZoneId zoneId) {
     super();
-    this.zoneId = zoneId;
+    udtfContext = new UDTFContext(zoneId);
     setOperatorType(Operator.OperatorType.UDTF);
   }
 
@@ -128,35 +125,23 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
 
   @Override
   public void constructUdfExecutors(List<ResultColumn> resultColumns) {
-    for (ResultColumn resultColumn : resultColumns) {
-      resultColumn.getExpression().constructUdfExecutors(expressionName2Executor, zoneId);
-    }
+    udtfContext.constructUdfExecutors(resultColumns);
   }
 
   @Override
   public void finalizeUDFExecutors(long queryId) {
-    try {
-      for (UDTFExecutor executor : expressionName2Executor.values()) {
-        executor.beforeDestroy();
-      }
-    } finally {
-      UDFClassLoaderManager.getInstance().finalizeUDFQuery(queryId);
-    }
+    udtfContext.finalizeUDFExecutors(queryId);
   }
 
   public ResultColumn getResultColumnByDatasetOutputIndex(int datasetOutputIndex) {
     return resultColumns.get(datasetOutputIndexToResultColumnIndex.get(datasetOutputIndex));
   }
 
-  public UDTFExecutor getExecutorByFunctionExpression(FunctionExpression functionExpression) {
-    return expressionName2Executor.get(functionExpression.getExpressionString());
-  }
-
-  public int getReaderIndex(String pathName) {
-    return pathNameToReaderIndex.get(pathName);
+  public Integer getReaderIndexByExpressionName(String expressionName) {
+    return pathNameToReaderIndex.get(expressionName);
   }
 
-  public int getReaderIndexByExpressionName(String expressionName) {
-    return pathNameToReaderIndex.get(expressionName);
+  public UDTFContext getUdtfContext() {
+    return udtfContext;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 1fe27e7c14..c299a355f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -102,7 +102,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
     initDataSetFields();
   }
 
-  protected UDTFDataSet(QueryContext queryContext, UDTFPlan udtfPlan, IUDFInputDataSet dataSet)
+  public UDTFDataSet(QueryContext queryContext, UDTFPlan udtfPlan, IUDFInputDataSet dataSet)
       throws QueryProcessException, IOException {
     queryId = queryContext.getQueryId();
     this.udtfPlan = udtfPlan;
@@ -123,6 +123,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
                   udtfPlan,
                   rawQueryInputLayer,
                   UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB)
+              .bindInputLayerColumnIndexWithExpression()
               .buildLayerMemoryAssigner()
               .buildResultColumnPointReaders()
               .setDataSetResultColumnDataTypes()
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 619167bf96..48f231c8fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.unary.ConstantOperand;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -49,7 +50,7 @@ public abstract class Expression {
 
   protected Boolean isConstantOperandCache = null;
 
-  protected Integer tsBlockInputColumnIndex = null;
+  protected Integer inputColumnIndex = null;
 
   public boolean isBuiltInAggregationFunctionExpression() {
     return false;
@@ -63,14 +64,6 @@ public abstract class Expression {
     return false;
   }
 
-  public Integer getTsBlockInputColumnIndex() {
-    return tsBlockInputColumnIndex;
-  }
-
-  public void setTsBlockInputColumnIndex(Integer tsBlockInputColumnIndex) {
-    this.tsBlockInputColumnIndex = tsBlockInputColumnIndex;
-  }
-
   public abstract void concat(
       List<PartialPath> prefixPaths,
       List<Expression> resultExpressions,
@@ -94,11 +87,13 @@ public abstract class Expression {
   public abstract void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId);
 
+  public abstract void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan);
+
   public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner);
 
   public abstract IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index b706a52f5d..18eca2151b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -205,6 +206,13 @@ public abstract class BinaryExpression extends Expression {
     rightExpression.constructUdfExecutors(expressionName2Executor, zoneId);
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+    leftExpression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+    rightExpression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+    inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     leftExpression.updateStatisticsForMemoryAssigner(memoryAssigner);
@@ -215,7 +223,7 @@ public abstract class BinaryExpression extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
@@ -227,7 +235,7 @@ public abstract class BinaryExpression extends Expression {
       IntermediateLayer leftParentIntermediateLayer =
           leftExpression.constructIntermediateLayer(
               queryId,
-              udtfPlan,
+              udtfContext,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
               expressionDataTypeMap,
@@ -235,7 +243,7 @@ public abstract class BinaryExpression extends Expression {
       IntermediateLayer rightParentIntermediateLayer =
           rightExpression.constructIntermediateLayer(
               queryId,
-              udtfPlan,
+              udtfContext,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
               expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index 0d32cfbad5..e8e0088599 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.ConstantIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
@@ -102,6 +103,11 @@ public class ConstantOperand extends Expression {
     // Do nothing
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+    // Do nothing
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     // Do nothing
@@ -110,7 +116,7 @@ public class ConstantOperand extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 564ce5bb09..3683ff50fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -246,6 +247,14 @@ public class FunctionExpression extends Expression {
     expressionName2Executor.put(expressionString, new UDTFExecutor(this, zoneId));
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+    for (Expression expression : expressions) {
+      expression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+    }
+    inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     for (Expression expression : expressions) {
@@ -257,7 +266,7 @@ public class FunctionExpression extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
@@ -269,13 +278,12 @@ public class FunctionExpression extends Expression {
       if (isBuiltInAggregationFunctionExpression) {
         transformer =
             new TransparentTransformer(
-                rawTimeSeriesInputLayer.constructPointReader(
-                    udtfPlan.getReaderIndexByExpressionName(toString())));
+                rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex));
       } else {
         IntermediateLayer udfInputIntermediateLayer =
             constructUdfInputIntermediateLayer(
                 queryId,
-                udtfPlan,
+                udtfContext,
                 rawTimeSeriesInputLayer,
                 expressionIntermediateLayerMap,
                 expressionDataTypeMap,
@@ -283,7 +291,7 @@ public class FunctionExpression extends Expression {
         transformer =
             constructUdfTransformer(
                 queryId,
-                udtfPlan,
+                udtfContext,
                 expressionDataTypeMap,
                 memoryAssigner,
                 udfInputIntermediateLayer);
@@ -303,7 +311,7 @@ public class FunctionExpression extends Expression {
 
   private IntermediateLayer constructUdfInputIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
@@ -314,7 +322,7 @@ public class FunctionExpression extends Expression {
       intermediateLayers.add(
           expression.constructIntermediateLayer(
               queryId,
-              udtfPlan,
+              udtfContext,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
               expressionDataTypeMap,
@@ -333,12 +341,12 @@ public class FunctionExpression extends Expression {
 
   private UDFQueryTransformer constructUdfTransformer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       Map<Expression, TSDataType> expressionDataTypeMap,
       LayerMemoryAssigner memoryAssigner,
       IntermediateLayer udfInputIntermediateLayer)
       throws QueryProcessException, IOException {
-    UDTFExecutor executor = udtfPlan.getExecutorByFunctionExpression(this);
+    UDTFExecutor executor = udtfContext.getExecutorByFunctionExpression(this);
 
     executor.beforeStart(queryId, memoryAssigner.assign(), expressionDataTypeMap);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
index 66893a0172..33fb9cb5a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -39,7 +40,11 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.io.IOException;
 import java.time.ZoneId;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class LogicNotExpression extends Expression {
   protected Expression expression;
@@ -127,6 +132,12 @@ public class LogicNotExpression extends Expression {
     expression.constructUdfExecutors(expressionName2Executor, zoneId);
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+    expression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+    inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     expression.updateStatisticsForMemoryAssigner(memoryAssigner);
@@ -136,7 +147,7 @@ public class LogicNotExpression extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
@@ -148,7 +159,7 @@ public class LogicNotExpression extends Expression {
       IntermediateLayer parentLayerPointReader =
           expression.constructIntermediateLayer(
               queryId,
-              udtfPlan,
+              udtfContext,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
               expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 04065daa55..5b8dccf1ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -132,6 +133,12 @@ public class NegationExpression extends Expression {
     expression.constructUdfExecutors(expressionName2Executor, zoneId);
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+    expression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+    inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     expression.updateStatisticsForMemoryAssigner(memoryAssigner);
@@ -141,7 +148,7 @@ public class NegationExpression extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
@@ -153,7 +160,7 @@ public class NegationExpression extends Expression {
       IntermediateLayer parentLayerPointReader =
           expression.constructIntermediateLayer(
               queryId,
-              udtfPlan,
+              udtfContext,
               rawTimeSeriesInputLayer,
               expressionIntermediateLayerMap,
               expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index a9c1052c01..adcb7dff6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -116,6 +117,11 @@ public class TimeSeriesOperand extends Expression {
     // nothing to do
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+    inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     memoryAssigner.increaseExpressionReference(this);
@@ -124,7 +130,7 @@ public class TimeSeriesOperand extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
-      UDTFPlan udtfPlan,
+      UDTFContext udtfContext,
       RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       Map<Expression, TSDataType> expressionDataTypeMap,
@@ -134,7 +140,7 @@ public class TimeSeriesOperand extends Expression {
       float memoryBudgetInMB = memoryAssigner.assign();
 
       LayerPointReader parentLayerPointReader =
-          rawTimeSeriesInputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath()));
+          rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex);
       expressionDataTypeMap.put(this, parentLayerPointReader.getDataType());
 
       expressionIntermediateLayerMap.put(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java
new file mode 100644
index 0000000000..cb7d467403
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.executor;
+
+import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class UDTFContext {
+
+  protected final ZoneId zoneId;
+
+  protected Map<String, UDTFExecutor> expressionName2Executor = new HashMap<>();
+
+  public UDTFContext(ZoneId zoneId) {
+    this.zoneId = zoneId;
+  }
+
+  public void constructUdfExecutors(List<ResultColumn> resultColumns) {
+    for (ResultColumn resultColumn : resultColumns) {
+      resultColumn.getExpression().constructUdfExecutors(expressionName2Executor, zoneId);
+    }
+  }
+
+  public void finalizeUDFExecutors(long queryId) {
+    try {
+      for (UDTFExecutor executor : expressionName2Executor.values()) {
+        executor.beforeDestroy();
+      }
+    } finally {
+      UDFClassLoaderManager.getInstance().finalizeUDFQuery(queryId);
+    }
+  }
+
+  public UDTFExecutor getExecutorByFunctionExpression(FunctionExpression functionExpression) {
+    return expressionName2Executor.get(functionExpression.getExpressionString());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index 184753489d..270482b1e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -69,6 +69,13 @@ public class DAGBuilder {
     expressionDataTypeMap = new HashMap<>();
   }
 
+  public DAGBuilder bindInputLayerColumnIndexWithExpression() {
+    for (Expression expression : resultColumnExpressions) {
+      expression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+    }
+    return this;
+  }
+
   public DAGBuilder buildLayerMemoryAssigner() {
     for (Expression expression : resultColumnExpressions) {
       expression.updateStatisticsForMemoryAssigner(memoryAssigner);
@@ -83,7 +90,7 @@ public class DAGBuilder {
           resultColumnExpressions[i]
               .constructIntermediateLayer(
                   queryId,
-                  udtfPlan,
+                  udtfPlan.getUdtfContext(),
                   rawTimeSeriesInputLayer,
                   expressionIntermediateLayerMap,
                   expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
index d462da438c..fec8fd71b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
@@ -94,6 +94,10 @@ public class RawQueryInputLayer {
     rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine());
   }
 
+  public int getInputColumnCount() {
+    return dataTypes.length;
+  }
+
   public LayerPointReader constructPointReader(int columnIndex) {
     return new InputLayerPointReader(columnIndex);
   }