You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/10/25 12:07:13 UTC

[iotdb] branch exp_code_gen updated: [IOTDB-4470] Code generation to accelerate expression calculation (#7380)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch exp_code_gen
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/exp_code_gen by this push:
     new 10bda066dc [IOTDB-4470] Code generation to accelerate expression calculation (#7380)
10bda066dc is described below

commit 10bda066dcc07d1f4cc24ccfb7f89c415019dc69
Author: flashzxi <39...@users.noreply.github.com>
AuthorDate: Tue Oct 25 20:07:06 2022 +0800

    [IOTDB-4470] Code generation to accelerate expression calculation (#7380)
---
 nohup.out                                          |  33 ++
 pom.xml                                            |   7 +
 server/pom.xml                                     |   4 +
 .../operator/process/FilterAndProjectOperator.java |  70 +++-
 .../operator/process/codegen/CodegenContext.java   | 239 +++++++++++++
 .../operator/process/codegen/CodegenEvaluator.java |  29 ++
 .../process/codegen/CodegenEvaluatorImpl.java      | 296 ++++++++++++++++
 .../operator/process/codegen/CodegenVisitor.java   | 376 +++++++++++++++++++++
 .../expressionnode/BetweenExpressionNode.java      |  66 ++++
 .../expressionnode/BinaryExpressionNode.java       |  72 ++++
 .../codegen/expressionnode/CodeExpressionNode.java |  41 +++
 .../expressionnode/ConstantExpressionNode.java     |  42 +++
 .../codegen/expressionnode/ExpressionNode.java     |  44 +++
 .../codegen/expressionnode/ExpressionNodeImpl.java |  39 +++
 .../expressionnode/FunctionExpressionNode.java     |  84 +++++
 .../expressionnode/IdentityExpressionNode.java     |  39 +++
 .../codegen/expressionnode/InExpressionNode.java   |  63 ++++
 .../expressionnode/IsNullExpressionNode.java       |  46 +++
 .../codegen/expressionnode/LeafExpressionNode.java |  42 +++
 .../codegen/expressionnode/NewExpressionNode.java  |  50 +++
 .../expressionnode/ReturnValueExpressionNode.java  |  59 ++++
 .../expressionnode/UnaryExpressionNode.java        |  50 +++
 .../codegen/statements/AssignmentStatement.java    |  76 +++++
 .../codegen/statements/DeclareStatement.java       |  44 +++
 .../process/codegen/statements/IfStatement.java    |  85 +++++
 .../codegen/statements/MethodCallStatement.java    |  54 +++
 .../codegen/statements/ReturnStatement.java        |  35 ++
 .../process/codegen/statements/Statement.java      |  24 ++
 .../process/codegen/statements/StatementBlock.java |  44 +++
 .../statements/UDTFAssignmentStatement.java        |  81 +++++
 .../codegen/statements/UpdateRowStatement.java     |  61 ++++
 .../process/codegen/statements/WhileStatement.java |  54 +++
 .../codegen/utils/CodeGenEvaluatorBaseClass.java   | 119 +++++++
 .../process/codegen/utils/CodegenSimpleRow.java    | 102 ++++++
 .../plan/expression/binary/BinaryExpression.java   |   4 +
 .../plan/expression/ternary/BetweenExpression.java |   4 +
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  24 ++
 37 files changed, 2599 insertions(+), 3 deletions(-)

diff --git a/nohup.out b/nohup.out
new file mode 100644
index 0000000000..7f92b72381
--- /dev/null
+++ b/nohup.out
@@ -0,0 +1,33 @@
+---------------------
+Starting IoTDB DataNode
+---------------------
+./server/target/iotdb-server-0.14.0-SNAPSHOT/sbin/../conf/datanode-env.sh: line 27: ulimit: open files: cannot modify limit: Operation not permitted
+Warning: Failed to set max number of files to be 65535, maybe you need to use 'sudo ulimit -n 65535' to set it when you use iotdb in production environments.
+WARN:
+WARN: the value of net.core.somaxconn (=128) is too small, please set it to a larger value using the following command.
+WARN:     sudo sysctl -w net.core.somaxconn=65535
+WARN: The original net.core.somaxconn value will be set back when the os reboots.
+WARN:
+setting local JMX...
+Maximum memory allocation pool = 40160MB, initial memory allocation pool = 3200MB
+If you want to change this configuration, please check conf/datanode-env.sh.
+2022-09-27 10:01:05,063 [main] INFO  o.a.i.d.c.IoTDBDescriptor:194 - Start to read config file file:./server/target/iotdb-server-0.14.0-SNAPSHOT/sbin/../conf/iotdb-datanode.properties 
+2022-09-27 10:01:05,081 [main] INFO  o.a.i.d.c.IoTDBDescriptor:1589 - allocateMemoryForRead = 11229619814 
+2022-09-27 10:01:05,081 [main] INFO  o.a.i.d.c.IoTDBDescriptor:1590 - allocateMemoryForWrite = 14972826419 
+2022-09-27 10:01:05,081 [main] INFO  o.a.i.d.c.IoTDBDescriptor:1591 - allocateMemoryForSchema = 3743206604 
+2022-09-27 10:01:05,082 [main] INFO  o.a.i.d.c.IoTDBDescriptor:1704 - allocateMemoryForSchemaRegion = 2994565283 
+2022-09-27 10:01:05,082 [main] INFO  o.a.i.d.c.IoTDBDescriptor:1707 - allocateMemoryForSchemaCache = 374320660 
+2022-09-27 10:01:05,082 [main] INFO  o.a.i.d.c.IoTDBDescriptor:1711 - allocateMemoryForPartitionCache = 0 
+2022-09-27 10:01:05,082 [main] INFO  o.a.i.d.c.IoTDBDescriptor:1714 - allocateMemoryForLastCache = 374320660 
+2022-09-27 10:01:05,085 [main] INFO  o.a.i.t.c.c.TSFileDescriptor:129 - try loading iotdb-datanode.properties from /home/zx/workspace/iotdb/./server/target/iotdb-server-0.14.0-SNAPSHOT/sbin/../conf/iotdb-datanode.properties 
+2022-09-27 10:01:05,094 [main] INFO  o.a.i.d.c.IoTDBDescriptor:367 - IoTDB enable memory control: true 
+2022-09-27 10:01:05,140 [main] INFO  o.a.i.m.c.MetricConfigDescriptor:54 - Start to read config file ./server/target/iotdb-server-0.14.0-SNAPSHOT/sbin/../conf/iotdb-metric.yml 
+2022-09-27 10:01:05,189 [main] INFO  o.a.i.d.c.IoTDBStartCheck:132 - Starting IoTDB 0.14.0-SNAPSHOT (Build: 3377c5d) 
+2022-09-27 10:01:05,190 [main] INFO  o.a.i.d.c.IoTDBStartCheck:141 -  ./server/target/iotdb-server-0.14.0-SNAPSHOT/sbin/../data/system/schema dir has been created. 
+2022-09-27 10:01:05,191 [main] INFO  o.a.i.d.c.IoTDBStartCheck:201 -  /home/zx/workspace/iotdb/./server/target/iotdb-server-0.14.0-SNAPSHOT/sbin/../data/system/schema/system.properties has been created. 
+2022-09-27 10:01:05,193 [main] INFO  o.a.i.d.s.DataNodeServerCommandLine:85 - Running mode -s 
+2022-09-27 10:01:05,195 [main] WARN  o.a.i.c.s.StartupChecks:38 - iotdb.jmx.port missing from datanode-env.sh(Unix or OS X, if you use Windows, check conf/datanode-env.bat) 
+2022-09-27 10:01:05,196 [main] INFO  o.a.i.c.s.StartupChecks:56 - JDK version is 8. 
+2022-09-27 10:01:05,200 [main] INFO  o.a.i.d.c.ConfigNodeInfo:94 - Successfully update ConfigNode: [TEndPoint(ip:127.0.0.1, port:22277)]. 
+2022-09-27 10:01:05,200 [main] INFO  o.a.i.db.service.DataNode:173 - start registering to the cluster. 
+2022-09-27 10:01:05,259 [main] WARN  o.a.i.db.service.DataNode:224 - Cannot register to the cluster, because: Fail to connect to any config node. Please check server it 
diff --git a/pom.xml b/pom.xml
index 44f42d20ad..5f71c4d13a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -184,6 +184,8 @@
         <openapi.generator.version>5.0.0</openapi.generator.version>
         <!-- cli -->
         <progressbar.version>0.9.3</progressbar.version>
+        <!-- expression code gen -->
+        <janino.version>3.1.7</janino.version>
         <!-- for java 11-->
         <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
         <log4j.version>1.2.19</log4j.version>
@@ -423,6 +425,11 @@
                 <artifactId>jackson-mapper-asl</artifactId>
                 <version>${jackson-mapper-asl.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.codehaus.janino</groupId>
+                <artifactId>janino</artifactId>
+                <version>${janino.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.glassfish.jaxb</groupId>
                 <artifactId>jaxb-runtime</artifactId>
diff --git a/server/pom.xml b/server/pom.xml
index 9d29b53919..a174b4ade9 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -88,6 +88,10 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>janino</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.airlift</groupId>
             <artifactId>units</artifactId>
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index eed0895a47..2df7a4b108 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.CodegenContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.CodegenEvaluator;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.CodegenEvaluatorImpl;
 import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.column.binary.BinaryColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
@@ -65,6 +68,13 @@ public class FilterAndProjectOperator implements ProcessOperator {
 
   // false when we only need to do projection
   private final boolean hasFilter;
+  private boolean codegenSuccess;
+
+  private boolean hasCodegenEvaluatedCache;
+
+  private Column[] codegenEvaluatedColumns;
+
+  private CodegenEvaluator codegenEvaluator;
 
   public FilterAndProjectOperator(
       OperatorContext operatorContext,
@@ -87,6 +97,42 @@ public class FilterAndProjectOperator implements ProcessOperator {
     this.hasNonMappableUDF = hasNonMappableUDF;
     this.filterTsBlockBuilder = new TsBlockBuilder(8, filterOutputDataTypes);
     this.hasFilter = hasFilter;
+    codegenSuccess = false;
+  }
+
+  public FilterAndProjectOperator(
+      CodegenContext codegenContext,
+      OperatorContext operatorContext,
+      Operator inputOperator,
+      List<TSDataType> filterOutputDataTypes,
+      List<LeafColumnTransformer> filterLeafColumnTransformerList,
+      ColumnTransformer filterOutputTransformer,
+      List<ColumnTransformer> commonTransformerList,
+      List<LeafColumnTransformer> projectLeafColumnTransformerList,
+      List<ColumnTransformer> projectOutputTransformerList,
+      boolean hasNonMappableUDF,
+      boolean hasFilter) {
+    this.operatorContext = operatorContext;
+    this.inputOperator = inputOperator;
+    this.filterLeafColumnTransformerList = filterLeafColumnTransformerList;
+    this.filterOutputTransformer = filterOutputTransformer;
+    this.commonTransformerList = commonTransformerList;
+    this.projectLeafColumnTransformerList = projectLeafColumnTransformerList;
+    this.projectOutputTransformerList = projectOutputTransformerList;
+    this.hasNonMappableUDF = hasNonMappableUDF;
+    this.filterTsBlockBuilder = new TsBlockBuilder(8, filterOutputDataTypes);
+    this.hasFilter = hasFilter;
+    tryCodegen(codegenContext);
+  }
+
+  private void tryCodegen(CodegenContext codegenContext) {
+    codegenEvaluator = new CodegenEvaluatorImpl(codegenContext);
+    try {
+      codegenEvaluator.generateEvaluatorClass();
+      codegenSuccess = true;
+    } catch (Exception e) {
+      codegenSuccess = false;
+    }
   }
 
   @Override
@@ -183,15 +229,33 @@ public class FilterAndProjectOperator implements ProcessOperator {
       leafColumnTransformer.initFromTsBlock(input);
     }
 
+    hasCodegenEvaluatedCache = false;
     List<Column> resultColumns = new ArrayList<>();
-    for (ColumnTransformer columnTransformer : projectOutputTransformerList) {
-      columnTransformer.tryEvaluate();
-      resultColumns.add(columnTransformer.getColumn());
+    for (int i = 0; i < projectOutputTransformerList.size(); ++i) {
+      Column outputColumn;
+      outputColumn = tryGetColumnByCodegen(input, i);
+      if (outputColumn == null) {
+        ColumnTransformer columnTransformer = projectOutputTransformerList.get(i);
+        columnTransformer.tryEvaluate();
+        outputColumn = columnTransformer.getColumn();
+      }
+      resultColumns.add(outputColumn);
     }
     return TsBlock.wrapBlocksWithoutCopy(
         positionCount, originTimeColumn, resultColumns.toArray(new Column[0]));
   }
 
+  private Column tryGetColumnByCodegen(TsBlock input, int i) {
+    if (codegenSuccess) {
+      if (!hasCodegenEvaluatedCache) {
+        codegenEvaluatedColumns = codegenEvaluator.evaluate(input);
+        hasCodegenEvaluatedCache = true;
+      }
+      return codegenEvaluatedColumns[i];
+    }
+    return null;
+  }
+
   @Override
   public boolean hasNext() {
     return inputOperator.hasNext();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/CodegenContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/CodegenContext.java
new file mode 100644
index 0000000000..c38468f031
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/CodegenContext.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen;
+
+import org.apache.iotdb.db.mpp.common.NodeRef;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.ExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements.AssignmentStatement;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements.DeclareStatement;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.utils.CodegenSimpleRow;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
+import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class CodegenContext {
+  private List<DeclareStatement> intermediateVariables;
+  private List<AssignmentStatement> assignmentStatements;
+  private final Map<String, List<InputLocation>> inputLocations;
+  private final List<TSDataType> inputDataTypes;
+  private Map<String, String> inputNameToVarName;
+  private final List<Expression> outputExpression;
+  private Map<Expression, ExpressionNode> expressionToNode;
+  private List<UDTFExecutor> udtfExecutors;
+  private List<CodegenSimpleRow> udtfRows;
+  private final Map<NodeRef<Expression>, TSDataType> expressionTypes;
+  private UDTFContext udtfContext;
+
+  private int udtfIndex = 0;
+
+  public void setUdtfContext(UDTFContext udtfContext) {
+    this.udtfContext = udtfContext;
+  }
+
+  public void setIsExpressionGeneratedSuccess(List<Boolean> isExpressionGeneratedSuccess) {
+    this.isExpressionGeneratedSuccess = isExpressionGeneratedSuccess;
+  }
+
+  private List<Boolean> isExpressionGeneratedSuccess;
+
+  private long uniqueIndex;
+
+  public CodegenContext(
+      Map<String, List<InputLocation>> inputLocations,
+      List<TSDataType> inputDataTypes,
+      List<Expression> outputExpressions,
+      Expression filterExpression,
+      Map<NodeRef<Expression>, TSDataType> expressionTypes) {
+    init();
+
+    this.inputLocations = inputLocations;
+    this.inputDataTypes = inputDataTypes;
+    this.outputExpression = outputExpressions;
+    this.expressionTypes = expressionTypes;
+  }
+
+  public void init() {
+    this.expressionToNode = new HashMap<>();
+    this.udtfRows = new ArrayList<>();
+    this.udtfExecutors = new ArrayList<>();
+    this.inputNameToVarName = new HashMap<>();
+    this.intermediateVariables = new ArrayList<>();
+    this.assignmentStatements = new ArrayList<>();
+  }
+
+  public void addInputVarNameMap(String inputName, String varName) {
+    inputNameToVarName.put(inputName, varName);
+  }
+
+  public String getVarName(String inputName) {
+    return inputNameToVarName.get(inputName);
+  }
+
+  public Map<String, String> getInputNameToVarName() {
+    return inputNameToVarName;
+  }
+
+  public boolean isExpressionExisted(Expression expression) {
+    return expressionToNode.containsKey(expression);
+  }
+
+  public void addExpression(Expression expression, ExpressionNode ExpressionNode) {
+    if (!expressionToNode.containsKey(expression)) {
+      expressionToNode.put(expression, ExpressionNode);
+    }
+  }
+
+  public Map<String, TSDataType> getOutputName2TypeMap() {
+    LinkedHashMap<String, TSDataType> outputName2TypeMap = new LinkedHashMap<>();
+    for (Expression expression : outputExpression) {
+      if (!expressionToNode.containsKey(expression)) {
+        outputName2TypeMap.put("non-existVariable", TSDataType.BOOLEAN);
+        continue;
+      }
+      outputName2TypeMap.put(
+          expressionToNode.get(expression).getNodeName(),
+          expressionTypes.get(NodeRef.of(expression)));
+    }
+    return outputName2TypeMap;
+  }
+
+  public ExpressionNode getExpressionNode(Expression expression) {
+    if (expressionToNode.containsKey(expression)) {
+      return expressionToNode.get(expression);
+    }
+    return null;
+  }
+
+  public void addUdtfExecutor(UDTFExecutor executor) {
+    udtfExecutors.add(executor);
+  }
+
+  public void addUdtfInput(CodegenSimpleRow input) {
+    udtfRows.add(input);
+  }
+
+  public String uniqueVarName() {
+    return "var" + (uniqueIndex++);
+  }
+
+  public String uniqueVarName(String prefix) {
+    return prefix + (uniqueIndex++);
+  }
+
+  public int getUdtfIndex() {
+    udtfIndex++;
+    return udtfIndex - 1;
+  }
+
+  public void addOutputExpr(Expression expression) {
+    outputExpression.add(expression);
+  }
+
+  public static Class<?> tsDatatypeToClass(TSDataType tsDataType) {
+    switch (tsDataType) {
+      case INT32:
+        return Integer.class;
+      case INT64:
+        return Long.class;
+      case FLOAT:
+        return Float.class;
+      case DOUBLE:
+        return Double.class;
+      case BOOLEAN:
+        return Boolean.class;
+      case TEXT:
+        return String.class;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Data type %s is not supported for codegen.", tsDataType));
+    }
+  }
+
+  public UDTFExecutor[] getUdtfExecutors() {
+    return udtfExecutors.toArray(new UDTFExecutor[0]);
+  }
+
+  public CodegenSimpleRow[] getUdtfRows() {
+    return udtfRows.toArray(new CodegenSimpleRow[0]);
+  }
+
+  public Map<String, List<InputLocation>> getInputLocations() {
+    return inputLocations;
+  }
+
+  public List<Expression> getOutputExpression() {
+    return outputExpression;
+  }
+
+  public List<TSDataType> getOutputDataTypes() {
+    return outputExpression.stream()
+        .map(expression -> expressionTypes.get(NodeRef.of(expression)))
+        .collect(Collectors.toList());
+  }
+
+  public boolean isExpressionInput(Expression expression) {
+    return inputLocations.containsKey(expression.getExpressionString());
+  }
+
+  public void addIntermediateVariable(DeclareStatement declareStatement) {
+    this.intermediateVariables.add(
+        new DeclareStatement("boolean", declareStatement.getVarName() + "IsNull"));
+    this.intermediateVariables.add(declareStatement);
+  }
+
+  public void addAssignmentStatement(AssignmentStatement assignmentStatement) {
+    this.assignmentStatements.add(assignmentStatement);
+  }
+
+  public List<DeclareStatement> getIntermediateVariables() {
+    return intermediateVariables;
+  }
+
+  public List<AssignmentStatement> getAssignmentStatements() {
+    return assignmentStatements;
+  }
+
+  public Map<NodeRef<Expression>, TSDataType> getExpressionTypes() {
+    return expressionTypes;
+  }
+
+  public List<TSDataType> getInputDataTypes() {
+    return inputDataTypes;
+  }
+
+  public UDTFExecutor getExecutorByFunctionExpression(FunctionExpression functionExpression) {
+    return udtfContext.getExecutorByFunctionExpression(functionExpression);
+  }
+
+  public TSDataType inferType(Expression expression) {
+    return expressionTypes.get(NodeRef.of(expression));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/CodegenEvaluator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/CodegenEvaluator.java
new file mode 100644
index 0000000000..2fdc74d63b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/CodegenEvaluator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public interface CodegenEvaluator {
+  void generateEvaluatorClass() throws Exception;
+
+  Column[] evaluate(TsBlock inputTsBlock);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/CodegenEvaluatorImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/CodegenEvaluatorImpl.java
new file mode 100644
index 0000000000..11f7b2db92
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/CodegenEvaluatorImpl.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.ConstantExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.IdentityExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.ReturnValueExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements.AssignmentStatement;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements.DeclareStatement;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements.IfStatement;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements.MethodCallStatement;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.utils.CodeGenEvaluatorBaseClass;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+import org.codehaus.commons.compiler.CompilerFactoryFactory;
+import org.codehaus.commons.compiler.IClassBodyEvaluator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * For all output expressions which can be dealt with codegen, we will generate a class extending
+ * {@link CodeGenEvaluatorBaseClass}. The generated code will implement several abstract method in
+ * {@link CodeGenEvaluatorBaseClass}, and declare all input variables and intermediate variables as
+ * its field.
+ *
+ * <p>for example: select (a + b) * c from root.sg.d1;
+ *
+ * <pre>{@code
+ * private int a;
+ * private boolean aIsNull;
+ * private float b;
+ * private boolean bIsNull;
+ * private double c;
+ * private boolean cIsNull;
+ * private double var1;
+ * private boolean var1IsNull;
+ * private double var2;
+ * private boolean var2IsNull;
+ *
+ * protected void updateInputVariables(int i){
+ *   if(valueColumns[0].isNull(i)){
+ *     aIsNull = true;
+ *   } else {
+ *     a = valueColumns[0].getInt(i);
+ *     aIsNull = false;
+ *   }
+ *   if(valueColumns[1].isNull(i)){
+ *     bIsNull = true;
+ *   } else {
+ *     b = valueColumns[1].getFloat(i);
+ *     bIsNull = false;
+ *   }
+ *   if(valueColumns[2].isNull(i)){
+ *     cIsNull = true;
+ *   } else {
+ *     c = valueColumns[2].getFloat(i);
+ *     cIsNull = false;
+ *   }
+ * }
+ *
+ * protected void evaluateByRow(int i){
+ *   updateInputVariables(i);
+ *   if(aIsNull || bIsNull){
+ *     var1IsNull = true;
+ *   } else {
+ *     var1IsNull = false;
+ *     var1 = a+b;
+ *   }
+ *   if(var2IsNull || cIsNull){
+ *     var2IsNull = true;
+ *   }else{
+ *     var2 = var2 * c;
+ *     var2IsNull = false;
+ *   }
+ *   if(var2IsNull){
+ *     outputColumns[0].appendNull();
+ *   }else{
+ *     outputColumns[0].writeDouble(var2);
+ *   }
+ * }
+ * }</pre>
+ *
+ * <p>Obviously, need to override two methods {@code CodeGenEvaluatorBaseClass#evaluateByRow(int)}
+ * {@code CodeGenEvaluatorBaseClass#updateInputVariables(int)}
+ */
+public class CodegenEvaluatorImpl implements CodegenEvaluator {
+  private final CodegenVisitor codegenVisitor;
+  private final List<Boolean> generatedSuccess;
+  private final CodegenContext codegenContext;
+  private boolean scriptFinished;
+  private CodeGenEvaluatorBaseClass codegenEvaluator;
+
+  IClassBodyEvaluator classBodyEvaluator;
+
+  public CodegenEvaluatorImpl(CodegenContext codegenContext) {
+    codegenVisitor = new CodegenVisitor();
+    generatedSuccess = new ArrayList<>();
+    this.codegenContext = codegenContext;
+    scriptFinished = false;
+  }
+
+  private String constructCode() {
+    parseOutputExpressions();
+
+    StringBuilder code = new StringBuilder();
+
+    generateFieldDeclareStatements(code);
+    generateUpdateInputVariables(code);
+    generateEvaluateRow(code);
+
+    return code.toString();
+  }
+
+  private void generateFieldDeclareStatements(StringBuilder code) {
+    for (DeclareStatement declareStatement : codegenContext.getIntermediateVariables()) {
+      // declare all variable and variableIsNull to sign whether variable is null
+      code.append("private ").append(declareStatement.toCode());
+    }
+  }
+
+  private void parseOutputExpressions() {
+    // add expressions, this will generate variable declare and assignments
+    for (Expression expression : codegenContext.getOutputExpression()) {
+      boolean success = codegenVisitor.visitExpression(expression, codegenContext);
+      generatedSuccess.add(success);
+    }
+
+    codegenContext.setIsExpressionGeneratedSuccess(generatedSuccess);
+  }
+
+  private void generateUpdateInputVariables(StringBuilder code) {
+    // get all input variables and their position
+    List<String> parameterNames = getParameterNames();
+
+    // generate updateInputVariables() method
+    code.append("protected void updateInputVariables(int i){\n");
+
+    for (int i = 0; i < parameterNames.size(); ++i) {
+      String varName = parameterNames.get(i);
+      IfStatement ifStatement = new IfStatement(true);
+      String instanceName = "valueColumns[" + i + "]";
+      String methodName =
+          "get" + tsDatatypeToPrimaryType(codegenContext.getInputDataTypes().get(i));
+      ifStatement.setCondition(new IdentityExpressionNode(instanceName + ".isNull(i)"));
+      ifStatement
+          .addIfBodyStatement(
+              new AssignmentStatement(varName + "IsNull", new ConstantExpressionNode("true")))
+          .addElseBodyStatement(
+              new AssignmentStatement(varName + "IsNull", new ConstantExpressionNode("false")))
+          .addElseBodyStatement(
+              new AssignmentStatement(
+                  varName, new ReturnValueExpressionNode(instanceName, methodName, "i")));
+      code.append(ifStatement.toCode());
+    }
+    code.append("}\n");
+  }
+
+  private void generateEvaluateRow(StringBuilder code) {
+    // generate evaluateByRow()
+    code.append("protected void evaluateByRow(int i){\n");
+    code.append("updateInputVariables(i);\n");
+
+    List<AssignmentStatement> assignmentStatements = codegenContext.getAssignmentStatements();
+    for (AssignmentStatement assignmentStatement : assignmentStatements) {
+      IfStatement ifStatement = new IfStatement(true);
+      ifStatement.setCondition(assignmentStatement.getNullCondition());
+      ifStatement
+          .addIfBodyStatement(
+              new AssignmentStatement(
+                  assignmentStatement.getVarName() + "IsNull", new ConstantExpressionNode("true")))
+          .addElseBodyStatement(
+              new AssignmentStatement(
+                  assignmentStatement.getVarName() + "IsNull", new ConstantExpressionNode("false")))
+          .addElseBodyStatement(assignmentStatement);
+
+      code.append(ifStatement.toCode());
+    }
+
+    // store variable in columnBuilder
+    ArrayList<Map.Entry<String, TSDataType>> pairs =
+        new ArrayList<>(codegenContext.getOutputName2TypeMap().entrySet());
+    for (int i = 0; i < pairs.size(); i++) {
+      if (!generatedSuccess.get(i)) {
+        continue;
+      }
+      Map.Entry<String, TSDataType> output = pairs.get(i);
+      IfStatement ifStatement = new IfStatement(true);
+      ifStatement.setCondition(new IdentityExpressionNode(output.getKey() + "IsNull"));
+      String methodName = "write" + tsDatatypeToPrimaryType(output.getValue());
+      ifStatement.addIfBodyStatement(
+          new MethodCallStatement("outputColumns[" + i + "]", "appendNull"));
+      ifStatement.addElseBodyStatement(
+          new MethodCallStatement("outputColumns[" + i + "]", methodName, output.getKey()));
+
+      code.append(ifStatement.toCode());
+    }
+    code.append("}");
+  }
+
+  private String tsDatatypeToPrimaryType(TSDataType tsDataType) {
+    switch (tsDataType) {
+      case INT32:
+        return "Int";
+      case INT64:
+        return "Long";
+      case FLOAT:
+        return "Float";
+      case DOUBLE:
+        return "Double";
+      case BOOLEAN:
+        return "Boolean";
+      default:
+        throw new UnsupportedOperationException();
+    }
+  }
+
+  private List<String> getParameterNames() {
+    Map<String, List<InputLocation>> inputLocations = codegenContext.getInputLocations();
+    // the inputLocations map is input name -> column index, we need to reverse key-value and sort
+    Map<Integer, String> columnToInputNameMap =
+        inputLocations.entrySet().stream()
+            .collect(
+                Collectors.toMap(
+                    entry -> entry.getValue().get(0).getValueColumnIndex(), Map.Entry::getKey));
+
+    TreeMap<Integer, String> columnToInputNameTreeMap = new TreeMap<>(columnToInputNameMap);
+
+    List<String> parameterNames =
+        columnToInputNameTreeMap.values().stream()
+            .map(codegenContext::getVarName)
+            .collect(Collectors.toList());
+
+    return parameterNames;
+  }
+
+  @Override
+  public void generateEvaluatorClass() throws Exception {
+    if (scriptFinished) {
+      return;
+    }
+
+    codegenContext.init();
+    String code = constructCode();
+
+    classBodyEvaluator =
+        CompilerFactoryFactory.getDefaultCompilerFactory(
+                CodegenEvaluatorImpl.class.getClassLoader())
+            .newClassBodyEvaluator();
+
+    // set imports
+    classBodyEvaluator.setDefaultImports(
+        "org.apache.iotdb.db.mpp.execution.operator.process.codegen.utils.CodegenSimpleRow",
+        "org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor");
+
+    classBodyEvaluator.setExtendedClass(CodeGenEvaluatorBaseClass.class);
+
+    classBodyEvaluator.cook(code);
+    codegenEvaluator = (CodeGenEvaluatorBaseClass) classBodyEvaluator.getClazz().newInstance();
+    codegenEvaluator.setOutputExpressionGenerateSuccess(generatedSuccess);
+    codegenEvaluator.setOutputDataTypes(codegenContext.getOutputDataTypes());
+    codegenEvaluator.setExecutors(codegenContext.getUdtfExecutors());
+    codegenEvaluator.setRows(codegenContext.getUdtfRows());
+
+    scriptFinished = true;
+  }
+
+  @Override
+  public Column[] evaluate(TsBlock inputTsBlock) {
+    return codegenEvaluator.evaluate(inputTsBlock);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/CodegenVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/CodegenVisitor.java
new file mode 100644
index 0000000000..024ab9a399
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/CodegenVisitor.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.BetweenExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.BinaryExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.ConstantExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.ExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.FunctionExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.IdentityExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.IsNullExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.LeafExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.UnaryExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements.AssignmentStatement;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements.DeclareStatement;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements.UDTFAssignmentStatement;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements.UpdateRowStatement;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.utils.CodegenSimpleRow;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.binary.BinaryExpression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimestampOperand;
+import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
+import org.apache.iotdb.db.mpp.plan.expression.ternary.BetweenExpression;
+import org.apache.iotdb.db.mpp.plan.expression.unary.InExpression;
+import org.apache.iotdb.db.mpp.plan.expression.unary.IsNullExpression;
+import org.apache.iotdb.db.mpp.plan.expression.unary.LogicNotExpression;
+import org.apache.iotdb.db.mpp.plan.expression.unary.NegationExpression;
+import org.apache.iotdb.db.mpp.plan.expression.unary.UnaryExpression;
+import org.apache.iotdb.db.mpp.plan.expression.visitor.ExpressionVisitor;
+import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy.AccessStrategyType.MAPPABLE_ROW_BY_ROW;
+
+public class CodegenVisitor extends ExpressionVisitor<Boolean, CodegenContext> {
+
+  private TimestampOperand globalTimestampOperand;
+
+  public CodegenVisitor() {}
+
+  @Override
+  public Boolean visitExpression(Expression expression, CodegenContext codegenContext) {
+    // don't support TEXT type now
+    if (codegenContext.inferType(expression) == TSDataType.TEXT) {
+      return false;
+    }
+    if (codegenContext.isExpressionExisted(expression)) {
+      return true;
+    }
+    if (codegenContext.isExpressionInput(expression)) {
+      String argName = codegenContext.uniqueVarName("input");
+      LeafExpressionNode leafExpressionNode = new LeafExpressionNode(argName);
+      codegenContext.addExpression(expression, leafExpressionNode);
+      codegenContext.addInputVarNameMap(expression.getExpressionString(), argName);
+      codegenContext.addIntermediateVariable(
+          createDeclareStatement(
+              codegenContext.inferType(expression), new IdentityExpressionNode(argName)));
+      return true;
+    }
+    return process(expression, codegenContext);
+  }
+
+  @Override
+  public Boolean visitLogicNotExpression(
+      LogicNotExpression logicNotExpression, CodegenContext codegenContext) {
+    if (visitExpression(logicNotExpression.getExpression(), codegenContext)) {
+      ExpressionNode subNode = codegenContext.getExpressionNode(logicNotExpression.getExpression());
+      UnaryExpressionNode notNode =
+          new UnaryExpressionNode(codegenContext.uniqueVarName(), subNode, "!");
+      codegenContext.addExpression(logicNotExpression, notNode);
+
+      DeclareStatement boolDeclareStatement =
+          new DeclareStatement("boolean", subNode.getNodeName());
+      codegenContext.addIntermediateVariable(boolDeclareStatement);
+      codegenContext.addAssignmentStatement(new AssignmentStatement(subNode));
+      return true;
+    }
+    return false;
+  }
+
+  private DeclareStatement createDeclareStatement(
+      TSDataType tsDataType, ExpressionNode expressionNode) {
+    DeclareStatement statement;
+    switch (tsDataType) {
+      case INT32:
+        statement = new DeclareStatement("int", expressionNode.getNodeName());
+        break;
+      case INT64:
+        statement = new DeclareStatement("long", expressionNode.getNodeName());
+        break;
+      case FLOAT:
+        statement = new DeclareStatement("float", expressionNode.getNodeName());
+        break;
+      case DOUBLE:
+        statement = new DeclareStatement("double", expressionNode.getNodeName());
+        break;
+      case BOOLEAN:
+        statement = new DeclareStatement("boolean", expressionNode.getNodeName());
+        break;
+        //      case TEXT:
+        //        statement = new DeclareStatement("String",expressionNode.getNodeName(),
+        // expressionNode);
+        //        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Data type %s is not supported for expression codegen.", tsDataType));
+    }
+    return statement;
+  }
+
+  @Override
+  public Boolean visitNegationExpression(
+      NegationExpression negationExpression, CodegenContext codegenContext) {
+    if (visitExpression(negationExpression.getExpression(), codegenContext)) {
+
+      ExpressionNode subNode = codegenContext.getExpressionNode(negationExpression.getExpression());
+      UnaryExpressionNode negationNode =
+          new UnaryExpressionNode(codegenContext.uniqueVarName(), subNode, "-");
+
+      TSDataType tsDataType = codegenContext.inferType(negationExpression);
+      codegenContext.addExpression(negationExpression, negationNode);
+
+      DeclareStatement statement;
+      switch (tsDataType) {
+        case INT32:
+        case INT64:
+        case FLOAT:
+        case DOUBLE:
+          statement = createDeclareStatement(tsDataType, negationNode);
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format(
+                  "Data type %s is not supported for negationExpression codegen.", tsDataType));
+      }
+
+      codegenContext.addIntermediateVariable(statement);
+      codegenContext.addAssignmentStatement(new AssignmentStatement(negationNode));
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public Boolean visitBinaryExpression(
+      BinaryExpression binaryExpression, CodegenContext codegenContext) {
+    if (!visitExpression(binaryExpression.getRightExpression(), codegenContext)) {
+      return false;
+    }
+    if (!visitExpression(binaryExpression.getLeftExpression(), codegenContext)) {
+      return false;
+    }
+
+    ExpressionNode left = codegenContext.getExpressionNode(binaryExpression.getLeftExpression());
+    String op = binaryExpression.getOperator();
+    ExpressionNode right = codegenContext.getExpressionNode(binaryExpression.getRightExpression());
+
+    BinaryExpressionNode binaryExpressionNode =
+        new BinaryExpressionNode(codegenContext.uniqueVarName(), op, left, right);
+    codegenContext.addExpression(binaryExpression, binaryExpressionNode);
+
+    DeclareStatement declareStatement =
+        createDeclareStatement(codegenContext.inferType(binaryExpression), binaryExpressionNode);
+    codegenContext.addIntermediateVariable(declareStatement);
+    codegenContext.addAssignmentStatement(new AssignmentStatement(binaryExpressionNode));
+    return true;
+  }
+
+  public Boolean visitIsNullExpression(
+      IsNullExpression isNullExpression, CodegenContext codegenContext) {
+    Expression subExpression = isNullExpression.getExpression();
+    if (!visitExpression(subExpression, codegenContext)) {
+      return false;
+    }
+    ExpressionNode subExpressionNode = codegenContext.getExpressionNode(subExpression);
+
+    IsNullExpressionNode isNullExpressionNode =
+        new IsNullExpressionNode(
+            codegenContext.uniqueVarName(), subExpressionNode, isNullExpression.isNot());
+
+    codegenContext.addExpression(isNullExpression, isNullExpressionNode);
+    codegenContext.addIntermediateVariable(
+        new DeclareStatement("boolean", isNullExpressionNode.getNodeName()));
+    codegenContext.addAssignmentStatement(new AssignmentStatement(isNullExpressionNode));
+    return true;
+  }
+
+  @Override
+  public Boolean visitBetweenExpression(
+      BetweenExpression betweenExpression, CodegenContext codegenContext) {
+    if (!visitExpression(betweenExpression.getFirstExpression(), codegenContext)) {
+      return false;
+    }
+    if (!visitExpression(betweenExpression.getSecondExpression(), codegenContext)) {
+      return false;
+    }
+    if (!visitExpression(betweenExpression.getThirdExpression(), codegenContext)) {
+      return false;
+    }
+
+    boolean isNotBetween = betweenExpression.isNotBetween();
+
+    ExpressionNode subExpressionNodeImpl =
+        codegenContext.getExpressionNode(betweenExpression.getFirstExpression());
+    ExpressionNode lowerNode =
+        codegenContext.getExpressionNode(betweenExpression.getSecondExpression());
+    ExpressionNode higherNode =
+        codegenContext.getExpressionNode(betweenExpression.getThirdExpression());
+
+    BetweenExpressionNode betweenExpressionNode =
+        new BetweenExpressionNode(
+            codegenContext.uniqueVarName(),
+            subExpressionNodeImpl,
+            lowerNode,
+            higherNode,
+            isNotBetween);
+
+    codegenContext.addExpression(betweenExpression, betweenExpressionNode);
+
+    DeclareStatement declareStatement =
+        createDeclareStatement(codegenContext.inferType(betweenExpression), betweenExpressionNode);
+    codegenContext.addIntermediateVariable(declareStatement);
+    codegenContext.addAssignmentStatement(new AssignmentStatement(betweenExpressionNode));
+    return true;
+  }
+
+  @Override
+  public Boolean visitConstantOperand(
+      ConstantOperand constantOperand, CodegenContext codegenContext) {
+    if (!codegenContext.isExpressionExisted(constantOperand)) {
+      String valueString = constantOperand.getValueString();
+      codegenContext.addExpression(constantOperand, new ConstantExpressionNode(valueString));
+    }
+    return true;
+  }
+
+  @Override
+  // since timeseries always as input, this method should never be called
+  public Boolean visitTimeSeriesOperand(
+      TimeSeriesOperand timeSeriesOperand, CodegenContext codegenContext) {
+    return true;
+  }
+
+  @Override
+  public Boolean visitTimeStampOperand(
+      TimestampOperand timestampOperand, CodegenContext codegenContext) {
+    // To avoid repeat of TimestampOperand
+    // all TimestampOperand will be replaced with globalTimestampOperand
+    if (!codegenContext.isExpressionExisted(globalTimestampOperand)) {
+      if (Objects.isNull(globalTimestampOperand)) {
+        globalTimestampOperand = timestampOperand;
+      }
+      LeafExpressionNode timestamp = new LeafExpressionNode("timestamp");
+      codegenContext.addExpression(globalTimestampOperand, timestamp);
+    }
+    return true;
+  }
+
+  public Boolean visitInExpression(InExpression inExpression, CodegenContext codegenContext) {
+    //    if (!expressionVisitor(inExpression.getExpression())) {
+    //      return false;
+    //    }
+    //    ExpressionNode subExpressionNode =
+    //        codegenContext.getExpressionNode(inExpression.getExpression());
+    //    String setName = codegenContext.uniqueVarName();
+    //    NewSetStatement newSetStatement =
+    //        new NewSetStatement(
+    //            setName,
+    //            new ArrayList<>(inExpression.getValues()),
+    //            inExpression.getExpression().inferTypes(typeProvider));
+    //    codegenContext.addCode(newSetStatement);
+    //    InExpressionNode inExpressionNode =
+    //        new InExpressionNode(
+    //            codegenContext.uniqueVarName(), subExpressionNode, setName,
+    // inExpression.isNotIn());
+    //    BoolDeclareStatement boolDeclareStatement = new BoolDeclareStatement(inExpressionNode);
+    //    codegenContext.addExpression(inExpression, inExpressionNode, TSDataType.BOOLEAN);
+    //    codegenContext.addCode(boolDeclareStatement);
+    return false;
+  }
+
+  @Override
+  public Boolean visitFunctionExpression(
+      FunctionExpression functionExpression, CodegenContext codegenContext) {
+    UDTFExecutor executor = codegenContext.getExecutorByFunctionExpression(functionExpression);
+    if (executor.getConfigurations().getAccessStrategy().getAccessStrategyType()
+        != MAPPABLE_ROW_BY_ROW) {
+      return false;
+    }
+
+    List<TSDataType> inputDatatype = new ArrayList<>();
+    for (Expression expression : functionExpression.getExpressions()) {
+      inputDatatype.add(codegenContext.inferType(expression));
+      if (!visitExpression(expression, codegenContext)) {
+        return false;
+      }
+    }
+
+    // get UDTFExecutor of udtf
+    int udtfIndex = codegenContext.getUdtfIndex();
+    String executorName = "executors[" + udtfIndex + "]";
+    codegenContext.addUdtfExecutor(executor);
+
+    // generate a simpleRow of udtf
+    CodegenSimpleRow inputRow = new CodegenSimpleRow(inputDatatype.toArray(new TSDataType[0]));
+    String rowName = "rows[" + udtfIndex + "]";
+    codegenContext.addUdtfInput(inputRow);
+
+    FunctionExpressionNode functionExpressionNode =
+        new FunctionExpressionNode(
+            codegenContext.uniqueVarName(),
+            executorName,
+            rowName,
+            codegenContext.inferType(functionExpression));
+
+    UpdateRowStatement updateRowStatement = new UpdateRowStatement(rowName);
+    for (Expression expression : functionExpression.getExpressions()) {
+      ExpressionNode subNode = codegenContext.getExpressionNode(expression);
+      updateRowStatement.addData(subNode);
+      functionExpressionNode.addSubExpressionNode(subNode);
+    }
+
+    // udtf may contain TimestampOperand
+    // to avoid repeat of TimestampOperand, all TimestampOperand will be replaced with
+    // globalTimestampOperand
+    if (Objects.isNull(globalTimestampOperand)) {
+      globalTimestampOperand = new TimestampOperand();
+    }
+
+    if (!codegenContext.isExpressionExisted(globalTimestampOperand)) {
+      LeafExpressionNode timestamp = new LeafExpressionNode("timestamp");
+      codegenContext.addExpression(globalTimestampOperand, timestamp);
+    }
+
+    codegenContext.addExpression(functionExpression, functionExpressionNode);
+
+    DeclareStatement declareStatement =
+        createDeclareStatement(
+            codegenContext.inferType(functionExpression), functionExpressionNode);
+    codegenContext.addIntermediateVariable(declareStatement);
+    codegenContext.addAssignmentStatement(
+        new UDTFAssignmentStatement(functionExpressionNode, updateRowStatement));
+    return true;
+  }
+
+  @Override
+  public Boolean visitUnaryExpression(
+      UnaryExpression unaryExpression, CodegenContext codegenContext) {
+    // like, in and some other unaryExpression haven't been handled
+    return false;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/BetweenExpressionNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/BetweenExpressionNode.java
new file mode 100644
index 0000000000..8e10740b25
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/BetweenExpressionNode.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BetweenExpressionNode extends ExpressionNodeImpl {
+  private final ExpressionNode firstNode;
+  private final ExpressionNode secondNode;
+  private final ExpressionNode thirdNode;
+  private final boolean isNotBetween;
+
+  public BetweenExpressionNode(
+      String nodeName,
+      ExpressionNode firstNode,
+      ExpressionNode secondNode,
+      ExpressionNode thirdNode,
+      boolean isNotBetween) {
+    this.firstNode = firstNode;
+    this.secondNode = secondNode;
+    this.thirdNode = thirdNode;
+    this.nodeName = nodeName;
+    this.isNotBetween = isNotBetween;
+  }
+
+  @Override
+  public String toCode() {
+    StringBuilder betweenCode = new StringBuilder();
+    betweenCode
+        .append(firstNode.getNodeName())
+        .append(">=")
+        .append(secondNode.getNodeName())
+        .append("&&")
+        .append(firstNode.getNodeName())
+        .append("<=")
+        .append(thirdNode.getNodeName());
+    return isNotBetween ? "!" + bracket(betweenCode.toString()) : betweenCode.toString();
+  }
+
+  @Override
+  public List<String> getIsNullCheckNodes() {
+    ArrayList<String> subNodes = new ArrayList<>();
+    subNodes.add(firstNode.getNodeName());
+    subNodes.add(secondNode.getNodeName());
+    subNodes.add(thirdNode.getNodeName());
+    return subNodes;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/BinaryExpressionNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/BinaryExpressionNode.java
new file mode 100644
index 0000000000..bc3ecda49b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/BinaryExpressionNode.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BinaryExpressionNode extends ExpressionNodeImpl {
+
+  private final String op;
+
+  private final ExpressionNode leftNode;
+
+  private final ExpressionNode rightNode;
+
+  public BinaryExpressionNode(
+      String nodeName, String op, ExpressionNode leftNode, ExpressionNode rightNode) {
+    this.nodeName = nodeName;
+    this.op = op;
+    this.leftNode = leftNode;
+    this.rightNode = rightNode;
+  }
+
+  public BinaryExpressionNode(String op, ExpressionNode leftNode, ExpressionNode rightNode) {
+    this.nodeName = null;
+    this.op = op;
+    this.leftNode = leftNode;
+    this.rightNode = rightNode;
+  }
+
+  @Override
+  public String toCode() {
+    StringBuilder binaryExpressionCode = new StringBuilder();
+    if (leftNode.getNodeName() != null) {
+      binaryExpressionCode.append(leftNode.getNodeName());
+    } else {
+      binaryExpressionCode.append(bracket(leftNode.toCode()));
+    }
+    binaryExpressionCode.append(op);
+    if (rightNode.getNodeName() != null) {
+      binaryExpressionCode.append(rightNode.getNodeName());
+    } else {
+      binaryExpressionCode.append(bracket(rightNode.toCode()));
+    }
+    return binaryExpressionCode.toString();
+  }
+
+  @Override
+  public List<String> getIsNullCheckNodes() {
+    ArrayList<String> subNodes = new ArrayList<>();
+    subNodes.add(leftNode.getNodeName());
+    subNodes.add(rightNode.getNodeName());
+    return subNodes;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/CodeExpressionNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/CodeExpressionNode.java
new file mode 100644
index 0000000000..5245ae204e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/CodeExpressionNode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+import java.util.List;
+
+public class CodeExpressionNode extends ExpressionNodeImpl {
+  private String code;
+
+  public CodeExpressionNode(String code) {
+    nodeName = null;
+    this.code = code;
+  }
+
+  @Override
+  public String toCode() {
+    return code;
+  }
+
+  @Override
+  public List<String> getIsNullCheckNodes() {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/ConstantExpressionNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/ConstantExpressionNode.java
new file mode 100644
index 0000000000..839104dace
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/ConstantExpressionNode.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ConstantExpressionNode extends ExpressionNodeImpl {
+
+  private final String literal;
+
+  public ConstantExpressionNode(String literal) {
+    this.literal = literal;
+  }
+
+  @Override
+  public String toCode() {
+    return literal;
+  }
+
+  @Override
+  public List<String> getIsNullCheckNodes() {
+    return new ArrayList<>();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/ExpressionNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/ExpressionNode.java
new file mode 100644
index 0000000000..ea1fd049dc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/ExpressionNode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+import java.util.List;
+
+/**
+ * This type is a expression node help to generate expression string it is not complete java
+ * statement
+ */
+public interface ExpressionNode {
+
+  /**
+   * get the code string of the expression if subNode has name, it will replace the full code of the
+   * subNode with its nodeName for example: a + b * c if subNode a * c has name "var1", the return
+   * value will be a + var1
+   */
+  String toCode();
+
+  /** @return name of this intermediate variable */
+  String getNodeName();
+
+  /** @return all intermediate variables which can cause this variable to be null */
+  List<String> getIsNullCheckNodes();
+
+  void setNodeName(String nodeName);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/ExpressionNodeImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/ExpressionNodeImpl.java
new file mode 100644
index 0000000000..238f26eced
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/ExpressionNodeImpl.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+public abstract class ExpressionNodeImpl implements ExpressionNode {
+
+  protected String nodeName;
+
+  @Override
+  public String getNodeName() {
+    return nodeName;
+  }
+
+  @Override
+  public void setNodeName(String nodeName) {
+    this.nodeName = nodeName;
+  }
+
+  protected String bracket(String code) {
+    return "(" + code + ")";
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/FunctionExpressionNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/FunctionExpressionNode.java
new file mode 100644
index 0000000000..9056054417
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/FunctionExpressionNode.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class FunctionExpressionNode extends ExpressionNodeImpl {
+  private final String rowName;
+
+  private final String executorName;
+
+  private final TSDataType tsDataType;
+
+  private List<ExpressionNode> subNodes;
+
+  public FunctionExpressionNode(
+      String nodeName, String executorName, String rowName, TSDataType tsDataType) {
+    this.nodeName = nodeName;
+    this.executorName = executorName;
+    this.tsDataType = tsDataType;
+    this.rowName = rowName;
+  }
+
+  public String getType() {
+    switch (tsDataType) {
+      case INT32:
+        return "Integer";
+      case TEXT:
+        return "Long";
+      case FLOAT:
+        return "Float";
+      case DOUBLE:
+        return "Double";
+      case BOOLEAN:
+        return "Boolean";
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Data type %s is not supported for udtf codegen.", tsDataType));
+    }
+  }
+
+  @Override
+  public String toCode() {
+    return "udtfCall(" + executorName + ", " + rowName + ")";
+  }
+
+  @Override
+  public List<String> getIsNullCheckNodes() {
+    ArrayList<String> subNodes = new ArrayList<>();
+    for (ExpressionNode node : this.subNodes) {
+      subNodes.add(node.getNodeName());
+    }
+    return subNodes;
+  }
+
+  public void addSubExpressionNode(ExpressionNode subNode) {
+    if (Objects.isNull(subNodes)) {
+      subNodes = new ArrayList<>();
+    }
+    subNodes.add(subNode);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/IdentityExpressionNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/IdentityExpressionNode.java
new file mode 100644
index 0000000000..8ac65b2f0d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/IdentityExpressionNode.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class IdentityExpressionNode extends ExpressionNodeImpl {
+  public IdentityExpressionNode(String nodeName) {
+    this.nodeName = nodeName;
+  }
+
+  @Override
+  public String toCode() {
+    return nodeName;
+  }
+
+  @Override
+  public List<String> getIsNullCheckNodes() {
+    return new ArrayList<>();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/InExpressionNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/InExpressionNode.java
new file mode 100644
index 0000000000..d58f568b87
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/InExpressionNode.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InExpressionNode extends ExpressionNodeImpl {
+  private final String setName;
+
+  private final ExpressionNode subNode;
+
+  private final boolean isNotIn;
+
+  public InExpressionNode(
+      String nodeName, ExpressionNode subNode, String setNameNode, boolean isNotIn) {
+    this.nodeName = nodeName;
+    this.setName = setNameNode;
+    this.subNode = subNode;
+    this.isNotIn = isNotIn;
+  }
+
+  public InExpressionNode(ExpressionNode subNode, String setName, boolean isNotIn) {
+    this.setName = setName;
+    this.subNode = subNode;
+    this.isNotIn = isNotIn;
+  }
+
+  @Override
+  public String toCode() {
+    if (subNode.getNodeName() != null) {
+      String code = setName + ".contains(" + subNode.getNodeName() + ")";
+      return isNotIn ? "! " + code : code;
+    } else {
+      String code = setName + ".contains(" + subNode.toCode() + ")";
+      return isNotIn ? "! " + code : code;
+    }
+  }
+
+  @Override
+  public List<String> getIsNullCheckNodes() {
+    ArrayList<String> subNodes = new ArrayList<>();
+    subNodes.add(subNode.getNodeName());
+    return subNodes;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/IsNullExpressionNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/IsNullExpressionNode.java
new file mode 100644
index 0000000000..9fd8b0f2c9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/IsNullExpressionNode.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class IsNullExpressionNode extends ExpressionNodeImpl {
+  private final ExpressionNode subExpression;
+
+  private final boolean isNotNull;
+
+  public IsNullExpressionNode(String nodeName, ExpressionNode subExpression, boolean isNotNull) {
+    this.isNotNull = isNotNull;
+    this.nodeName = nodeName;
+    this.subExpression = subExpression;
+  }
+
+  @Override
+  public String toCode() {
+    String bool = isNotNull ? "false" : "true";
+    return subExpression.getNodeName() + "IsNull == " + bool;
+  }
+
+  @Override
+  public List<String> getIsNullCheckNodes() {
+    return new ArrayList<>();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/LeafExpressionNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/LeafExpressionNode.java
new file mode 100644
index 0000000000..e6296a0fe0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/LeafExpressionNode.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LeafExpressionNode extends ExpressionNodeImpl {
+
+  public LeafExpressionNode(String nodeName) {
+    this.nodeName = nodeName;
+  }
+
+  @Override
+  public String toCode() {
+    return getNodeName();
+  }
+
+  @Override
+  public List<String> getIsNullCheckNodes() {
+    ArrayList<String> subNodes = new ArrayList<>();
+    subNodes.add(nodeName);
+    return subNodes;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/NewExpressionNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/NewExpressionNode.java
new file mode 100644
index 0000000000..badb9ac70a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/NewExpressionNode.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class NewExpressionNode extends ExpressionNodeImpl {
+
+  private String type;
+
+  private String[] args;
+
+  public NewExpressionNode(String type, String... args) {
+    this.type = type;
+    this.args = args;
+  }
+
+  @Override
+  public String toCode() {
+    StringBuilder code = new StringBuilder();
+    code.append("new ").append(type).append("(");
+    for (int i = 0; i < args.length; i++) {
+      code.append(args[i]).append(", ");
+    }
+    return code.delete(code.length() - 2, code.length()).append(")").toString();
+  }
+
+  @Override
+  public List<String> getIsNullCheckNodes() {
+    return new ArrayList<>();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/ReturnValueExpressionNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/ReturnValueExpressionNode.java
new file mode 100644
index 0000000000..b75cd07b66
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/ReturnValueExpressionNode.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ReturnValueExpressionNode extends ExpressionNodeImpl {
+
+  private final String variableName;
+
+  private final String methodName;
+
+  private final String[] args;
+
+  public ReturnValueExpressionNode(String variableName, String methodName, String... args) {
+    this.variableName = variableName;
+    this.methodName = methodName;
+    this.args = args;
+  }
+
+  @Override
+  public String toCode() {
+    StringBuilder code = new StringBuilder();
+    if (variableName != null) {
+      code.append(variableName).append(".");
+    }
+    code.append(methodName).append("(");
+    for (String arg : args) {
+      code.append(arg).append(", ");
+    }
+    if (args.length > 0) {
+      code.delete(code.length() - 2, code.length());
+    }
+    return code.append(")").toString();
+  }
+
+  @Override
+  public List<String> getIsNullCheckNodes() {
+    return new ArrayList<>();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/UnaryExpressionNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/UnaryExpressionNode.java
new file mode 100644
index 0000000000..8af78711e6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/expressionnode/UnaryExpressionNode.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class UnaryExpressionNode extends ExpressionNodeImpl {
+  private final ExpressionNode subNode;
+
+  private final String op;
+
+  public UnaryExpressionNode(String nodeName, ExpressionNode subNode, String op) {
+    this.nodeName = nodeName;
+    this.subNode = subNode;
+    this.op = op;
+  }
+
+  @Override
+  public String toCode() {
+    if (subNode.getNodeName() != null) {
+      return op + subNode.getNodeName();
+    }
+    return op + bracket(subNode.toCode());
+  }
+
+  @Override
+  public List<String> getIsNullCheckNodes() {
+    ArrayList<String> subNodes = new ArrayList<>();
+    subNodes.add(subNode.getNodeName());
+    return subNodes;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/AssignmentStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/AssignmentStatement.java
new file mode 100644
index 0000000000..85dbda2781
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/AssignmentStatement.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.ConstantExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.ExpressionNode;
+
+import java.util.List;
+
+// assign value to a variable
+public class AssignmentStatement implements Statement {
+  private final String varName;
+
+  private final ExpressionNode es;
+
+  public AssignmentStatement(String varName, ExpressionNode es) {
+    this.varName = varName;
+    this.es = es;
+  }
+
+  public AssignmentStatement(ExpressionNode es) {
+    this.es = es;
+    this.varName = es.getNodeName();
+  }
+
+  public String getVarName() {
+    return varName;
+  }
+
+  public ExpressionNode getExpressionNode() {
+    return es;
+  }
+
+  public String getNodeName() {
+    return es.getNodeName();
+  }
+
+  @Override
+  public String toCode() {
+    return varName + " = " + es.toCode() + ";\n";
+  }
+
+  public ExpressionNode getNullCondition() {
+    List<String> subNodes = es.getIsNullCheckNodes();
+    if (subNodes == null || subNodes.size() == 0) {
+      // no condition, always treat as variable will not be null
+      return new ConstantExpressionNode("false");
+    }
+    StringBuilder conditionCode = new StringBuilder();
+    for (String subNodeName : subNodes) {
+      if (subNodeName != null) {
+        conditionCode.append(subNodeName).append("IsNull || ");
+      }
+    }
+
+    conditionCode.delete(conditionCode.length() - 4, conditionCode.length());
+    return new ConstantExpressionNode(conditionCode.toString());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/DeclareStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/DeclareStatement.java
new file mode 100644
index 0000000000..ae95bfc382
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/DeclareStatement.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements;
+
+// declare a new variable
+public class DeclareStatement implements Statement {
+
+  // not only declare a new variable
+  // this class will also check whether current value is null
+  protected String varName;
+
+  protected String type;
+
+  public DeclareStatement(String type, String varName) {
+    this.type = type;
+    this.varName = varName;
+  }
+
+  public String getVarName() {
+    return varName;
+  }
+
+  @Override
+  public String toCode() {
+    return type + " " + varName + ";\n";
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/IfStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/IfStatement.java
new file mode 100644
index 0000000000..b6c11dbd3c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/IfStatement.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.ExpressionNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class IfStatement implements Statement {
+
+  private ExpressionNode condition;
+
+  private final List<Statement> ifBody;
+
+  private boolean haveElse;
+
+  private List<Statement> elseBody;
+
+  public IfStatement(boolean haveElse) {
+    this.haveElse = haveElse;
+    ifBody = new ArrayList<>();
+    if (haveElse) {
+      elseBody = new ArrayList<>();
+    }
+  }
+
+  public boolean isHaveElse() {
+    return haveElse;
+  }
+
+  public IfStatement addIfBodyStatement(Statement statement) {
+    ifBody.add(statement);
+    return this;
+  }
+
+  public IfStatement addElseBodyStatement(Statement statement) {
+    if (!isHaveElse()) {
+      // TODO: throw exception
+      return this;
+    }
+    elseBody.add(statement);
+    return this;
+  }
+
+  public void setCondition(ExpressionNode condition) {
+    this.condition = condition;
+  }
+
+  @Override
+  public String toCode() {
+    StringBuilder ifCode = new StringBuilder();
+    ifCode.append("if(").append(condition.toCode()).append("){\n");
+    for (Statement s : ifBody) {
+      ifCode.append("    ").append(s.toCode());
+    }
+
+    if (isHaveElse()) {
+      ifCode.append("} else {\n");
+      for (Statement s : elseBody) {
+        ifCode.append("    ").append(s.toCode());
+      }
+    }
+
+    ifCode.append("}\n");
+    return ifCode.toString();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/MethodCallStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/MethodCallStatement.java
new file mode 100644
index 0000000000..13dc4c31ee
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/MethodCallStatement.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements;
+
+// call function with side effect
+public class MethodCallStatement implements Statement {
+
+  private final String variableName;
+
+  private final String methodName;
+
+  private final String[] args;
+
+  public MethodCallStatement(String variableName, String functionName, String... args) {
+    this.variableName = variableName;
+    this.methodName = functionName;
+    this.args = args;
+  }
+
+  @Override
+  public String toCode() {
+    StringBuilder code = new StringBuilder();
+    if (variableName != null) {
+      code.append(variableName).append(".");
+    }
+
+    code.append(methodName).append("(");
+    for (String arg : args) {
+      code.append(arg).append(", ");
+    }
+
+    if (args.length != 0) {
+      code.delete(code.length() - 2, code.length());
+    }
+    return code.append(");\n").toString();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/ReturnStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/ReturnStatement.java
new file mode 100644
index 0000000000..4cea82ce10
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/ReturnStatement.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.ExpressionNode;
+
+public class ReturnStatement implements Statement {
+  private ExpressionNode es;
+
+  public ReturnStatement(ExpressionNode es) {
+    this.es = es;
+  }
+
+  @Override
+  public String toCode() {
+    return "return " + es.toCode() + ";\n";
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/Statement.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/Statement.java
new file mode 100644
index 0000000000..f37d5af949
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/Statement.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements;
+
+public interface Statement {
+  String toCode();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/StatementBlock.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/StatementBlock.java
new file mode 100644
index 0000000000..09e944b5fe
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/StatementBlock.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StatementBlock implements Statement {
+  private final List<Statement> statements;
+
+  public StatementBlock() {
+    statements = new ArrayList<>();
+  }
+
+  public void addStatement(Statement statement) {
+    statements.add(statement);
+  }
+
+  @Override
+  public String toCode() {
+    StringBuilder code = new StringBuilder();
+    for (Statement statement : statements) {
+      code.append(statement.toCode()).append("/n");
+    }
+    return code.toString();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/UDTFAssignmentStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/UDTFAssignmentStatement.java
new file mode 100644
index 0000000000..3a8dc30450
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/UDTFAssignmentStatement.java
@@ -0,0 +1,81 @@
+/* *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.CodeExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.ConstantExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.ExpressionNode;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.FunctionExpressionNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+// This class is used to help generate AssignmentStatement of UDTF
+public class UDTFAssignmentStatement extends AssignmentStatement {
+  private final List<Statement> ifBodyStatements;
+
+  public UDTFAssignmentStatement(
+      FunctionExpressionNode functionExpressionNode, UpdateRowStatement updateRowStatement) {
+    super(functionExpressionNode);
+    this.ifBodyStatements = new ArrayList<>();
+
+    String objectName = getNodeName() + "Object";
+    ifBodyStatements.add(updateRowStatement);
+    ifBodyStatements.add(new DeclareStatement("Object", objectName));
+    ifBodyStatements.add(new AssignmentStatement(objectName, getExpressionNode()));
+
+    IfStatement objectIsNull = new IfStatement(true);
+    objectIsNull.setCondition(new ConstantExpressionNode(objectName + " == null"));
+    objectIsNull.addIfBodyStatement(
+        new AssignmentStatement(getNodeName() + "IsNull", new ConstantExpressionNode("true")));
+    objectIsNull.addElseBodyStatement(
+        new AssignmentStatement(
+            getNodeName(),
+            new CodeExpressionNode("(" + functionExpressionNode.getType() + ")" + objectName)));
+    ifBodyStatements.add(objectIsNull);
+  }
+
+  @Override
+  public String toCode() {
+    StringBuilder code = new StringBuilder();
+    for (Statement statement : ifBodyStatements) {
+      code.append(statement.toCode());
+    }
+    return code.toString();
+  }
+
+  @Override
+  public ExpressionNode getNullCondition() {
+    List<String> subNodes = getExpressionNode().getIsNullCheckNodes();
+    if (subNodes == null || subNodes.size() == 0) {
+      // no condition, always treat as variable will not be null
+      return new ConstantExpressionNode("false");
+    }
+    StringBuilder conditionCode = new StringBuilder();
+    for (String subNodeName : subNodes) {
+      if (subNodeName != null) {
+        conditionCode.append(subNodeName).append("IsNull && ");
+      }
+    }
+
+    conditionCode.delete(conditionCode.length() - 4, conditionCode.length());
+    return new ConstantExpressionNode(conditionCode.toString());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/UpdateRowStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/UpdateRowStatement.java
new file mode 100644
index 0000000000..547a03bf5b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/UpdateRowStatement.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.ExpressionNode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This class is used to help create UDTF input row. It just update a row but never create a new one
+ */
+public class UpdateRowStatement implements Statement {
+  private List<ExpressionNode> columns;
+
+  private final String varName;
+
+  public UpdateRowStatement(String varName) {
+    this.varName = varName;
+  }
+
+  public void addData(ExpressionNode ExpressionNode) {
+    if (Objects.isNull(columns)) {
+      columns = new ArrayList<>();
+    }
+    columns.add(ExpressionNode);
+  }
+
+  public void setColumns(List<ExpressionNode> columns) {
+    this.columns = columns;
+  }
+
+  @Override
+  public String toCode() {
+    StringBuilder newRow = new StringBuilder();
+    newRow.append(varName).append(".setData(timestamp");
+    for (ExpressionNode ExpressionNode : columns) {
+      newRow.append(", ").append(ExpressionNode.getNodeName());
+    }
+    newRow.append(");\n");
+    return newRow.toString();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/WhileStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/WhileStatement.java
new file mode 100644
index 0000000000..0751f82658
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/statements/WhileStatement.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.statements;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.expressionnode.ExpressionNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class WhileStatement implements Statement {
+  private final ExpressionNode condition;
+
+  private List<Statement> loopBody;
+
+  public WhileStatement(ExpressionNode condition) {
+    this.condition = condition;
+    loopBody = new ArrayList<>();
+  }
+
+  public void addLoopStatement(Statement statement) {
+    loopBody.add(statement);
+  }
+
+  @Override
+  public String toCode() {
+    StringBuilder whileStatementCode = new StringBuilder();
+
+    whileStatementCode.append("while(").append(condition.toCode()).append("){\n");
+
+    for (Statement s : loopBody) {
+      whileStatementCode.append("  ").append(s.toCode()).append("\n");
+    }
+
+    whileStatementCode.append("}");
+    return whileStatementCode.toString();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/utils/CodeGenEvaluatorBaseClass.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/utils/CodeGenEvaluatorBaseClass.java
new file mode 100644
index 0000000000..5249eeeeb4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/utils/CodeGenEvaluatorBaseClass.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.utils;
+
+import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.udf.api.access.Row;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class CodeGenEvaluatorBaseClass {
+
+  protected Map<String, Boolean> isNull;
+
+  protected TsBlock inputTsBlock;
+
+  protected long timestamp;
+
+  protected List<TSDataType> outputDataTypes;
+
+  protected TsBlockBuilder tsBlockBuilder;
+
+  protected TimeColumn timeColumn;
+  protected Column[] valueColumns;
+  protected ColumnBuilder[] outputColumns;
+  protected List<Boolean> outputExpressionGenerateSuccess;
+
+  protected UDTFExecutor[] executors;
+
+  protected CodegenSimpleRow[] rows;
+
+  public void setIsNull(Map<String, Boolean> isNull) {
+    this.isNull = isNull;
+  }
+
+  public void setOutputExpressionGenerateSuccess(List<Boolean> outputExpressionGenerateSuccess) {
+    this.outputExpressionGenerateSuccess = outputExpressionGenerateSuccess;
+  }
+
+  public static Object udtfCall(UDTFExecutor udtfExecutor, Row row) {
+    udtfExecutor.execute(row);
+    return udtfExecutor.getCurrentValue();
+  }
+
+  public void setOutputDataTypes(List<TSDataType> outputDataTypes) {
+    this.outputDataTypes = outputDataTypes;
+  }
+
+  protected void resetOutputTsBlock() {
+    if (tsBlockBuilder == null) {
+      tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+    } else {
+      tsBlockBuilder.reset();
+    }
+    outputColumns = tsBlockBuilder.getValueColumnBuilders();
+  }
+
+  protected void resetColumns() {
+    valueColumns = inputTsBlock.getValueColumns();
+    timeColumn = inputTsBlock.getTimeColumn();
+  }
+
+  public Column[] evaluate(TsBlock inputTsBlock) {
+    this.inputTsBlock = inputTsBlock;
+
+    resetOutputTsBlock();
+    resetColumns();
+    int positionCount = inputTsBlock.getPositionCount();
+    for (int i = 0; i < positionCount; i++) {
+      timestamp = timeColumn.getLong(i);
+      evaluateByRow(i);
+    }
+    tsBlockBuilder.declarePositions(positionCount);
+
+    Column[] output = new Column[outputColumns.length];
+    for (int i = 0; i < output.length; ++i) {
+      if (outputExpressionGenerateSuccess.get(i)) {
+        output[i] = outputColumns[i].build();
+      }
+    }
+
+    return output;
+  }
+
+  protected abstract void evaluateByRow(int i);
+
+  protected abstract void updateInputVariables(int i);
+
+  public void setExecutors(UDTFExecutor[] executors) {
+    this.executors = executors;
+  }
+
+  public void setRows(CodegenSimpleRow[] rows) {
+    this.rows = rows;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/utils/CodegenSimpleRow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/utils/CodegenSimpleRow.java
new file mode 100644
index 0000000000..9e51e4ed16
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/codegen/utils/CodegenSimpleRow.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.codegen.utils;
+
+import org.apache.iotdb.commons.udf.utils.UDFBinaryTransformer;
+import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.type.Binary;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class CodegenSimpleRow implements Row {
+
+  Object[] values;
+  long timestamp;
+  TSDataType[] tsDataTypes;
+
+  public CodegenSimpleRow(TSDataType[] tsDataTypes) {
+    this.tsDataTypes = tsDataTypes;
+  }
+
+  public void setData(long timestamp, Object... values) {
+    this.timestamp = timestamp;
+    this.values = values;
+  }
+
+  @Override
+  public long getTime() throws IOException {
+    return timestamp;
+  }
+
+  @Override
+  public int getInt(int columnIndex) throws IOException {
+    return (int) values[columnIndex];
+  }
+
+  @Override
+  public long getLong(int columnIndex) throws IOException {
+    return (long) values[columnIndex];
+  }
+
+  @Override
+  public float getFloat(int columnIndex) throws IOException {
+    return (float) values[columnIndex];
+  }
+
+  @Override
+  public double getDouble(int columnIndex) throws IOException {
+    return (double) values[columnIndex];
+  }
+
+  @Override
+  public boolean getBoolean(int columnIndex) throws IOException {
+    return (boolean) values[columnIndex];
+  }
+
+  @Override
+  public Binary getBinary(int columnIndex) throws IOException {
+    return UDFBinaryTransformer.transformToUDFBinary(
+        (org.apache.iotdb.tsfile.utils.Binary) values[columnIndex]);
+  }
+
+  @Override
+  public String getString(int columnIndex) throws IOException {
+    return ((org.apache.iotdb.tsfile.utils.Binary) values[columnIndex]).getStringValue();
+  }
+
+  @Override
+  public Type getDataType(int columnIndex) {
+    return UDFDataTypeTransformer.transformToUDFDataType(tsDataTypes[columnIndex]);
+  }
+
+  @Override
+  public boolean isNull(int columnIndex) {
+    return Objects.isNull(values[columnIndex]);
+  }
+
+  @Override
+  public int size() {
+    return this.tsDataTypes.length;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
index 58722f7da5..d5a90e6443 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
@@ -242,6 +242,10 @@ public abstract class BinaryExpression extends Expression {
     return builder.toString();
   }
 
+  public String getOperator() {
+    return operator();
+  }
+
   protected abstract String operator();
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ternary/BetweenExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ternary/BetweenExpression.java
index 62e91a9348..8bee41140f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ternary/BetweenExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ternary/BetweenExpression.java
@@ -83,6 +83,10 @@ public class BetweenExpression extends TernaryExpression {
     ReadWriteIOUtils.write(isNotBetween, stream);
   }
 
+  public Expression getExpression() {
+    return this;
+  }
+
   @Override
   public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
     return visitor.visitBetweenExpression(this, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index d83990cde3..15648b1529 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOper
 import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.codegen.CodegenContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
@@ -182,6 +183,7 @@ import org.apache.commons.lang3.Validate;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -842,12 +844,22 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
       }
     }
 
+    CodegenContext codegenContext =
+        new CodegenContext(
+            inputLocations,
+            inputDataTypes,
+            Arrays.asList(node.getOutputExpressions()),
+            null,
+            expressionTypes);
+
     // Use FilterAndProject Operator when project expressions are all mappable
     if (!hasNonMappableUDF) {
       // init project UDTFContext
       UDTFContext projectContext = new UDTFContext(node.getZoneId());
       projectContext.constructUdfExecutors(projectExpressions);
 
+      codegenContext.setUdtfContext(projectContext);
+
       List<ColumnTransformer> projectOutputTransformerList = new ArrayList<>();
       Map<Expression, ColumnTransformer> projectExpressionColumnTransformerMap = new HashMap<>();
 
@@ -873,6 +885,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
       }
 
       return new FilterAndProjectOperator(
+          codegenContext,
           operatorContext,
           inputOperator,
           inputDataTypes,
@@ -975,12 +988,22 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
 
     Map<Expression, ColumnTransformer> projectExpressionColumnTransformerMap = new HashMap<>();
 
+    CodegenContext codegenContext =
+        new CodegenContext(
+            inputLocations,
+            inputDataTypes,
+            Arrays.asList(node.getOutputExpressions()),
+            node.getPredicate(),
+            expressionTypes);
+
     // init project transformer when project expressions are all mappable
     if (!hasNonMappableUDF) {
       // init project UDTFContext
       UDTFContext projectContext = new UDTFContext(node.getZoneId());
       projectContext.constructUdfExecutors(projectExpressions);
 
+      codegenContext.setUdtfContext(projectContext);
+
       ColumnTransformerVisitor.ColumnTransformerVisitorContext projectColumnTransformerContext =
           new ColumnTransformerVisitor.ColumnTransformerVisitorContext(
               projectContext,
@@ -1001,6 +1024,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
 
     Operator filter =
         new FilterAndProjectOperator(
+            codegenContext,
             operatorContext,
             inputOperator,
             filterOutputDataTypes,