You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/25 07:54:49 UTC

[iotdb] branch master updated: [IOTDB-3723] Replace FilterOperator with FilterAndProjectOperator and use batch processing for better performance (#6714)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b7dc520a7 [IOTDB-3723] Replace FilterOperator with FilterAndProjectOperator and use batch processing for better performance (#6714)
2b7dc520a7 is described below

commit 2b7dc520a7f6bd56e96b1fe13ea3945ddc68a712
Author: Liao Lanyu <10...@users.noreply.github.com>
AuthorDate: Mon Jul 25 15:54:43 2022 +0800

    [IOTDB-3723] Replace FilterOperator with FilterAndProjectOperator and use batch processing for better performance (#6714)
---
 .../db/it/alignbydevice/IoTDBAlignByDeviceIT.java  |  21 +-
 .../withoutNull/IoTDBWithoutNullAnyFilterIT.java   |  14 +-
 .../commons/udf/builtin/String/UDTFEndsWith.java   |   3 +
 .../commons/udf/builtin/String/UDTFLower.java      |   3 +
 .../commons/udf/builtin/String/UDTFUpper.java      |   3 +
 .../apache/iotdb/commons/udf/builtin/UDTFAbs.java  |   3 +
 .../apache/iotdb/commons/udf/builtin/UDTFMath.java |   3 +
 .../iotdb/commons/udf/builtin/UDTFOnOff.java       |   3 +
 .../operator/process/FilterAndProjectOperator.java | 193 ++++++++++++++++
 .../execution/operator/process/FilterOperator.java | 243 ---------------------
 .../iotdb/db/mpp/plan/expression/Expression.java   |  15 ++
 .../plan/expression/binary/AdditionExpression.java |  12 +
 .../plan/expression/binary/BinaryExpression.java   |  73 +++++++
 .../plan/expression/binary/DivisionExpression.java |  12 +
 .../plan/expression/binary/EqualToExpression.java  |  11 +
 .../expression/binary/GreaterEqualExpression.java  |  12 +
 .../expression/binary/GreaterThanExpression.java   |  12 +
 .../expression/binary/LessEqualExpression.java     |  12 +
 .../plan/expression/binary/LessThanExpression.java |  12 +
 .../plan/expression/binary/LogicAndExpression.java |  11 +
 .../plan/expression/binary/LogicOrExpression.java  |  11 +
 .../plan/expression/binary/ModuloExpression.java   |  12 +
 .../binary/MultiplicationExpression.java           |  12 +
 .../plan/expression/binary/NonEqualExpression.java |  12 +
 .../expression/binary/SubtractionExpression.java   |  12 +
 .../mpp/plan/expression/leaf/ConstantOperand.java  |  31 +++
 .../db/mpp/plan/expression/leaf/LeafOperand.java   |   6 +
 .../plan/expression/leaf/TimeSeriesOperand.java    |  30 +++
 .../mpp/plan/expression/leaf/TimestampOperand.java |  29 +++
 .../plan/expression/multi/FunctionExpression.java  | 104 ++++++++-
 .../plan/expression/ternary/BetweenExpression.java |  25 +++
 .../plan/expression/ternary/TernaryExpression.java |  92 ++++++++
 .../db/mpp/plan/expression/unary/InExpression.java |   9 +
 .../plan/expression/unary/IsNullExpression.java    |  14 ++
 .../mpp/plan/expression/unary/LikeExpression.java  |   9 +
 .../plan/expression/unary/LogicNotExpression.java  |   9 +
 .../plan/expression/unary/NegationExpression.java  |   9 +
 .../plan/expression/unary/RegularExpression.java   |   9 +
 .../mpp/plan/expression/unary/UnaryExpression.java |  60 +++++
 .../db/mpp/plan/planner/LocalExecutionPlanner.java | 121 +++++++++-
 .../mpp/transformation/dag/column/ColumnCache.java |  56 +++++
 .../dag/column/ColumnTransformer.java              |  78 +++++++
 .../ArithmeticAdditionColumnTransformer.java}      |  25 +--
 .../binary/ArithmeticBinaryColumnTransformer.java  |  57 +++++
 .../ArithmeticDivisionColumnTransformer.java}      |  25 +--
 .../binary/ArithmeticModuloColumnTransformer.java} |  25 +--
 ...ArithmeticMultiplicationColumnTransformer.java} |  24 +-
 .../ArithmeticSubtractionColumnTransformer.java}   |  25 +--
 .../dag/column/binary/BinaryColumnTransformer.java |  65 ++++++
 .../binary/CompareBinaryColumnTransformer.java     |  89 ++++++++
 .../binary/CompareEqualToColumnTransformer.java    |  47 ++++
 .../CompareGreaterEqualColumnTransformer.java}     |  25 +--
 .../CompareGreaterThanColumnTransformer.java}      |  25 +--
 .../binary/CompareLessEqualColumnTransformer.java} |  25 +--
 .../binary/CompareLessThanColumnTransformer.java}  |  25 +--
 .../binary/CompareNonEqualColumnTransformer.java   |  47 ++++
 .../column/binary/LogicAndColumnTransformer.java}  |  25 +--
 .../binary/LogicBinaryColumnTransformer.java       |  65 ++++++
 .../column/binary/LogicOrColumnTransformer.java}   |  25 +--
 .../column/leaf/ConstantColumnTransformer.java}    |  26 +--
 .../column/leaf/IdentityColumnTransformer.java}    |  31 +--
 .../dag/column/leaf/LeafColumnTransformer.java}    |  28 +--
 .../dag/column/leaf/TimeColumnTransformer.java}    |  24 +-
 .../column/multi/MappableUDFColumnTransformer.java |  91 ++++++++
 .../column/ternary/BetweenColumnTransformer.java   |  93 ++++++++
 .../ternary/CompareTernaryColumnTransformer.java   |  74 +++++++
 .../column/ternary/TernaryColumnTransformer.java   |  44 ++++
 .../unary/ArithmeticNegationColumnTransformer.java |  51 +++++
 .../dag/column/unary/InColumnTransformer.java      | 207 ++++++++++++++++++
 .../dag/column/unary/IsNullColumnTransformer.java} |  29 +--
 .../column/unary/LogicNotColumnTransformer.java    |  52 +++++
 .../dag/column/unary/RegularColumnTransformer.java |  61 ++++++
 .../dag/column/unary/UnaryColumnTransformer.java   |  51 +++++
 .../dag/transformer/Transformer.java               |  21 --
 .../binary/CompareBinaryTransformer.java           |  21 --
 .../binary/CompareEqualToTransformer.java          |   3 +-
 .../binary/CompareGreaterEqualTransformer.java     |   3 +-
 .../binary/CompareGreaterThanTransformer.java      |   3 +-
 .../binary/CompareLessEqualTransformer.java        |   3 +-
 .../binary/CompareLessThanTransformer.java         |   3 +-
 .../binary/CompareNonEqualTransformer.java         |   3 +-
 .../transformer/ternary/BetweenTransformer.java    |   5 +-
 .../mpp/transformation/dag/udf/UDTFExecutor.java   |  29 ++-
 ...eInferrer.java => UDTFInformationInferrer.java} |  64 ++++--
 .../transformation/dag/util/TransformUtils.java    |  99 +++++++++
 .../iotdb/tsfile/read/common/type/BinaryType.java  |  55 +++++
 .../iotdb/tsfile/read/common/type/BooleanType.java |  55 +++++
 .../iotdb/tsfile/read/common/type/DoubleType.java  |  85 +++++++
 .../iotdb/tsfile/read/common/type/FloatType.java   |  85 +++++++
 .../iotdb/tsfile/read/common/type/IntType.java     |  85 +++++++
 .../iotdb/tsfile/read/common/type/LongType.java    |  85 +++++++
 .../apache/iotdb/tsfile/read/common/type/Type.java | 104 +++++++++
 .../iotdb/tsfile/read/common/type/TypeEnum.java    |  26 +--
 .../iotdb/tsfile/read/common/type/TypeFactory.java |  40 ++--
 94 files changed, 3179 insertions(+), 625 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
index e066d15403..142185bdb4 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
@@ -45,7 +45,6 @@ import java.util.Map;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
-@Category({LocalStandaloneIT.class, ClusterIT.class})
 public class IoTDBAlignByDeviceIT {
 
   private static String[] sqls =
@@ -127,6 +126,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void selectTest() {
     String[] retArray =
         new String[] {
@@ -194,6 +194,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void selectWithDuplicatedPathsTest() {
     String[] retArray =
         new String[] {
@@ -251,6 +252,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void selectLimitTest() {
     String[] retArray =
         new String[] {
@@ -304,6 +306,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void selectSlimitTest2() {
     String[] retArray =
         new String[] {
@@ -348,6 +351,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void selectSlimitTest() {
     String[] retArray =
         new String[] {
@@ -404,6 +408,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void selectWithValueFilterTest() {
     String[] retArray =
         new String[] {
@@ -460,6 +465,8 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category(ClusterIT.class)
+  // Result is different from Old standalone version
   public void selectDifferentSeriesWithValueFilterWithoutCacheTest() {
     String[] retArray =
         new String[] {
@@ -469,6 +476,7 @@ public class IoTDBAlignByDeviceIT {
           "103,root.vehicle.d0,99,",
           "104,root.vehicle.d0,90,",
           "105,root.vehicle.d0,99,",
+          "946684800000,root.vehicle.d0,null",
         };
 
     try (Connection connection = EnvFactory.getEnv().getConnection();
@@ -508,6 +516,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void selectDifferentSeriesWithBinaryValueFilterWithoutCacheTest() {
     String[] retArray =
         new String[] {
@@ -552,6 +561,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void aggregateTest() {
     String[] retArray =
         new String[] {"root.vehicle.d0,11,11,6,6,1,", "root.vehicle.d1,2,null,null,null,null,"};
@@ -600,6 +610,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void groupByTimeTest() {
     String[] retArray =
         new String[] {
@@ -655,6 +666,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void groupByTimeWithValueFilterTest() {
     String[] retArray =
         new String[] {
@@ -700,6 +712,7 @@ public class IoTDBAlignByDeviceIT {
 
   @Ignore
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void fillTest() {
     String[] retArray =
         new String[] {
@@ -751,6 +764,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void errorCaseTest3() {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
@@ -773,6 +787,7 @@ public class IoTDBAlignByDeviceIT {
    * count(root.vehicle.d0.s0) INT64 count(root.vehicle.d1.s0) INT64 count(root.other.d1.s0) INT64
    */
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void unusualCaseTest1() {
     String[] retArray =
         new String[] {"root.other.d1,1,", "root.vehicle.d0,11,", "root.vehicle.d1,2,"};
@@ -810,6 +825,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void unusualCaseTest2() {
     String[] retArray =
         new String[] {
@@ -869,6 +885,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void selectNonExistTest() {
     String[] retArray =
         new String[] {
@@ -957,6 +974,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void selectWithRegularExpressionTest() {
     String[] retArray =
         new String[] {
@@ -1024,6 +1042,7 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void selectWithNonExistMeasurementInWhereClause() {
     String[] retArray = new String[] {"1,root.vehicle.d0,101,1101,null,null,null,"};
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/withoutNull/IoTDBWithoutNullAnyFilterIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/withoutNull/IoTDBWithoutNullAnyFilterIT.java
index 73fb29d090..bce708ae7d 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/withoutNull/IoTDBWithoutNullAnyFilterIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/withoutNull/IoTDBWithoutNullAnyFilterIT.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.it.withoutNull;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.itbase.category.ClusterIT;
 
-import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -101,7 +101,7 @@ public class IoTDBWithoutNullAnyFilterIT {
 
   private static final String TIMESTAMP_STR = "Time";
 
-  private void prepareData() {
+  private static void prepareData() {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
 
@@ -114,14 +114,14 @@ public class IoTDBWithoutNullAnyFilterIT {
     }
   }
 
-  @Before
-  public void setUp() throws Exception {
+  @BeforeClass
+  public static void setUp() throws Exception {
     EnvFactory.getEnv().initBeforeTest();
     prepareData();
   }
 
-  @After
-  public void tearDown() throws Exception {
+  @AfterClass
+  public static void tearDown() throws Exception {
     EnvFactory.getEnv().cleanAfterTest();
   }
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFEndsWith.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFEndsWith.java
index 30d96cfc15..bd6643d933 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFEndsWith.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFEndsWith.java
@@ -55,6 +55,9 @@ public class UDTFEndsWith implements UDTF {
 
   @Override
   public Object transform(Row row) throws IOException {
+    if (row.isNull(0)) {
+      return null;
+    }
     return row.getString(0).endsWith(target);
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFLower.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFLower.java
index b0f6c211a0..9fb74a24d3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFLower.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFLower.java
@@ -53,6 +53,9 @@ public class UDTFLower implements UDTF {
 
   @Override
   public Object transform(Row row) throws IOException {
+    if (row.isNull(0)) {
+      return null;
+    }
     return Binary.valueOf(row.getString(0).toLowerCase());
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFUpper.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFUpper.java
index defedd5a01..c97ef51292 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFUpper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFUpper.java
@@ -53,6 +53,9 @@ public class UDTFUpper implements UDTF {
 
   @Override
   public Object transform(Row row) throws IOException {
+    if (row.isNull(0)) {
+      return null;
+    }
     return Binary.valueOf(row.getString(0).toUpperCase());
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFAbs.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFAbs.java
index e14062f9d0..5e1a0cf96a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFAbs.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFAbs.java
@@ -73,6 +73,9 @@ public class UDTFAbs extends UDTFMath {
 
   @Override
   public Object transform(Row row) throws IOException {
+    if (row.isNull(0)) {
+      return null;
+    }
     switch (dataType) {
       case INT32:
         return Math.abs(row.getInt(0));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMath.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMath.java
index d2cecf0ecf..1ce8ed210a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMath.java
@@ -96,6 +96,9 @@ public abstract class UDTFMath implements UDTF {
 
   @Override
   public Object transform(Row row) throws IOException {
+    if (row.isNull(0)) {
+      return null;
+    }
     switch (dataType) {
       case INT32:
         return transformer.transform(row.getInt(0));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFOnOff.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFOnOff.java
index f142c4ea1b..0917f4f000 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFOnOff.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFOnOff.java
@@ -90,6 +90,9 @@ public class UDTFOnOff implements UDTF {
 
   @Override
   public Object transform(Row row) throws IOException {
+    if (row.isNull(0)) {
+      return null;
+    }
     switch (dataType) {
       case INT32:
         return row.getInt(0) >= threshold;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
new file mode 100644
index 0000000000..8dcf57797f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FilterAndProjectOperator implements ProcessOperator {
+
+  private final Operator inputOperator;
+
+  private List<LeafColumnTransformer> filterLeafColumnTransformerList;
+
+  private ColumnTransformer filterOutputTransformer;
+
+  private List<ColumnTransformer> commonTransformerList;
+
+  private List<LeafColumnTransformer> projectLeafColumnTransformerList;
+
+  private List<ColumnTransformer> projectOutputTransformerList;
+
+  private final TsBlockBuilder filterTsBlockBuilder;
+
+  private final boolean hasNonMappableUDF;
+
+  private final OperatorContext operatorContext;
+
+  public FilterAndProjectOperator(
+      OperatorContext operatorContext,
+      Operator inputOperator,
+      List<TSDataType> filterOutputDataTypes,
+      List<LeafColumnTransformer> filterLeafColumnTransformerList,
+      ColumnTransformer filterOutputTransformer,
+      List<ColumnTransformer> commonTransformerList,
+      List<LeafColumnTransformer> projectLeafColumnTransformerList,
+      List<ColumnTransformer> projectOutputTransformerList,
+      boolean hasNonMappableUDF) {
+    this.operatorContext = operatorContext;
+    this.inputOperator = inputOperator;
+    this.filterLeafColumnTransformerList = filterLeafColumnTransformerList;
+    this.filterOutputTransformer = filterOutputTransformer;
+    this.commonTransformerList = commonTransformerList;
+    this.projectLeafColumnTransformerList = projectLeafColumnTransformerList;
+    this.projectOutputTransformerList = projectOutputTransformerList;
+    this.hasNonMappableUDF = hasNonMappableUDF;
+    this.filterTsBlockBuilder = new TsBlockBuilder(8, filterOutputDataTypes);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock input = inputOperator.next();
+    if (input == null) {
+      return null;
+    }
+
+    TsBlock filterResult = getFilterTsBlock(input);
+
+    // contains non-mappable udf, we leave calculation for TransformOperator
+    if (hasNonMappableUDF) {
+      return filterResult;
+    }
+    return getTransformedTsBlock(filterResult);
+  }
+
+  /**
+   * Return the TsBlock that contains both initial input columns and columns of common
+   * subexpressions after filtering
+   *
+   * @param input
+   * @return
+   */
+  private TsBlock getFilterTsBlock(TsBlock input) {
+    final TimeColumn originTimeColumn = input.getTimeColumn();
+    final int positionCount = originTimeColumn.getPositionCount();
+    // feed Filter ColumnTransformer, including TimeStampColumnTransformer and constant
+    for (LeafColumnTransformer leafColumnTransformer : filterLeafColumnTransformerList) {
+      leafColumnTransformer.initFromTsBlock(input);
+    }
+
+    filterOutputTransformer.tryEvaluate();
+
+    Column filterColumn = filterOutputTransformer.getColumn();
+
+    // reuse this builder
+    filterTsBlockBuilder.reset();
+
+    final TimeColumnBuilder timeBuilder = filterTsBlockBuilder.getTimeColumnBuilder();
+    final ColumnBuilder[] columnBuilders = filterTsBlockBuilder.getValueColumnBuilders();
+
+    List<Column> resultColumns = new ArrayList<>();
+    for (int i = 0, n = input.getValueColumnCount(); i < n; i++) {
+      resultColumns.add(input.getColumn(i));
+    }
+
+    // todo: remove this if, add calculated common sub expressions anyway
+    if (!hasNonMappableUDF) {
+      // get result of calculated common sub expressions
+      for (ColumnTransformer columnTransformer : commonTransformerList) {
+        resultColumns.add(columnTransformer.getColumn());
+      }
+    }
+
+    // construct result TsBlock of filter
+    int rowCount = 0;
+    for (int i = 0, n = resultColumns.size(); i < n; i++) {
+      for (int j = 0; j < positionCount; j++) {
+        if (!filterColumn.isNull(j) && filterColumn.getBoolean(j)) {
+          if (i == 0) {
+            rowCount++;
+            timeBuilder.writeLong(originTimeColumn.getLong(j));
+          }
+          Column curColumn = resultColumns.get(i);
+          if (curColumn.isNull(j)) {
+            columnBuilders[i].appendNull();
+          } else {
+            columnBuilders[i].write(curColumn, j);
+          }
+        }
+      }
+    }
+
+    filterTsBlockBuilder.declarePositions(rowCount);
+    return filterTsBlockBuilder.build();
+  }
+
+  private TsBlock getTransformedTsBlock(TsBlock input) {
+    final TimeColumn originTimeColumn = input.getTimeColumn();
+    final int positionCount = originTimeColumn.getPositionCount();
+    // feed pre calculated data
+    for (LeafColumnTransformer leafColumnTransformer : projectLeafColumnTransformerList) {
+      leafColumnTransformer.initFromTsBlock(input);
+    }
+
+    List<Column> resultColumns = new ArrayList<>();
+    for (ColumnTransformer columnTransformer : projectOutputTransformerList) {
+      columnTransformer.tryEvaluate();
+      resultColumns.add(columnTransformer.getColumn());
+    }
+    return TsBlock.wrapBlocksWithoutCopy(
+        positionCount, originTimeColumn, resultColumns.toArray(new Column[0]));
+  }
+
+  @Override
+  public boolean hasNext() {
+    return inputOperator.hasNext();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return inputOperator.isFinished();
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return inputOperator.isBlocked();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
deleted file mode 100644
index 7e6a440c94..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.execution.operator.process;
-
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.mpp.execution.operator.Operator;
-import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
-import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
-import org.apache.iotdb.db.mpp.transformation.api.YieldableState;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class FilterOperator extends TransformOperator {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(FilterOperator.class);
-
-  private LayerPointReader filterPointReader;
-
-  public FilterOperator(
-      OperatorContext operatorContext,
-      Operator inputOperator,
-      List<TSDataType> inputDataTypes,
-      Map<String, List<InputLocation>> inputLocations,
-      Expression filterExpression,
-      Expression[] outputExpressions,
-      boolean keepNull,
-      ZoneId zoneId,
-      TypeProvider typeProvider,
-      boolean isAscending)
-      throws QueryProcessException, IOException {
-    super(
-        operatorContext,
-        inputOperator,
-        inputDataTypes,
-        inputLocations,
-        bindExpressions(filterExpression, outputExpressions),
-        keepNull,
-        zoneId,
-        typeProvider,
-        isAscending);
-  }
-
-  private static Expression[] bindExpressions(
-      Expression filterExpression, Expression[] outputExpressions) {
-    Expression[] expressions = new Expression[outputExpressions.length + 1];
-    System.arraycopy(outputExpressions, 0, expressions, 0, outputExpressions.length);
-    expressions[expressions.length - 1] = filterExpression;
-    return expressions;
-  }
-
-  @Override
-  protected void initTransformers(
-      Map<String, List<InputLocation>> inputLocations,
-      Expression[] outputExpressions,
-      TypeProvider typeProvider)
-      throws QueryProcessException, IOException {
-    super.initTransformers(inputLocations, outputExpressions, typeProvider);
-
-    filterPointReader = transformers[transformers.length - 1];
-    if (filterPointReader.getDataType() != TSDataType.BOOLEAN) {
-      throw new UnSupportedDataTypeException(
-          String.format(
-              "Data type of the filter expression should be BOOLEAN, but %s is received.",
-              filterPointReader.getDataType()));
-    }
-  }
-
-  @Override
-  public TsBlock next() {
-
-    try {
-      YieldableState yieldableState = iterateAllColumnsToNextValid();
-      if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
-        return null;
-      }
-
-      final TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
-      final int outputColumnCount = transformers.length - 1;
-      if (outputDataTypes == null) {
-        outputDataTypes = new ArrayList<>();
-        for (int i = 0; i < outputColumnCount; ++i) {
-          outputDataTypes.add(transformers[i].getDataType());
-        }
-      }
-      tsBlockBuilder.buildValueColumnBuilders(outputDataTypes);
-      final TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
-      final ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
-
-      int rowCount = 0;
-      while (!timeHeap.isEmpty()) {
-        final long currentTime = timeHeap.pollFirst();
-
-        yieldableState = filterPointReader.yield();
-        if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
-          timeHeap.add(currentTime);
-          tsBlockBuilder.declarePositions(rowCount);
-          return tsBlockBuilder.build();
-        }
-
-        if (yieldableState == YieldableState.YIELDABLE
-            && filterPointReader.currentTime() == currentTime) {
-
-          boolean isReaderContinueNull = true;
-          for (int i = 0; isReaderContinueNull && i < outputColumnCount; ++i) {
-            isReaderContinueNull = collectReaderAppendIsNull(transformers[i], currentTime);
-          } // After the loop, isReaderContinueNull is true means all values of readers are null
-
-          if (!filterPointReader.isCurrentNull()
-              && filterPointReader.currentBoolean()
-              && !isReaderContinueNull) {
-            // time
-            timeBuilder.writeLong(currentTime);
-
-            // values
-            for (int i = 0; i < outputColumnCount; ++i) {
-              yieldableState = collectDataPoint(transformers[i], columnBuilders[i], currentTime, i);
-              if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
-                for (int j = 0; j <= i; ++j) {
-                  shouldIterateReadersToNextValid[j] = false;
-                }
-                timeHeap.add(currentTime);
-
-                tsBlockBuilder.declarePositions(rowCount);
-                return tsBlockBuilder.build();
-              }
-            }
-            shouldIterateReadersToNextValid[outputColumnCount] = true;
-
-            for (int i = 0; i <= outputColumnCount; ++i) {
-              if (shouldIterateReadersToNextValid[i]) {
-                transformers[i].readyForNext();
-              }
-            }
-
-            ++rowCount;
-          } else {
-            // values
-            for (int i = 0; i < outputColumnCount; ++i) {
-              yieldableState = skipDataPoint(transformers[i], currentTime, i);
-              if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
-                for (int j = 0; j <= i; ++j) {
-                  shouldIterateReadersToNextValid[j] = false;
-                }
-                timeHeap.add(currentTime);
-
-                tsBlockBuilder.declarePositions(rowCount);
-                return tsBlockBuilder.build();
-              }
-            }
-            shouldIterateReadersToNextValid[outputColumnCount] = true;
-
-            for (int i = 0; i <= outputColumnCount; ++i) {
-              if (shouldIterateReadersToNextValid[i]) {
-                transformers[i].readyForNext();
-              }
-            }
-          }
-        } else {
-          // values
-          for (int i = 0; i < outputColumnCount; ++i) {
-            yieldableState = skipDataPoint(transformers[i], currentTime, i);
-            if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
-              for (int j = 0; j <= i; ++j) {
-                shouldIterateReadersToNextValid[j] = false;
-              }
-              timeHeap.add(currentTime);
-
-              tsBlockBuilder.declarePositions(rowCount);
-              return tsBlockBuilder.build();
-            }
-          }
-
-          for (int i = 0; i < outputColumnCount; ++i) {
-            if (shouldIterateReadersToNextValid[i]) {
-              transformers[i].readyForNext();
-            }
-          }
-        }
-
-        yieldableState = iterateAllColumnsToNextValid();
-        if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
-          tsBlockBuilder.declarePositions(rowCount);
-          return tsBlockBuilder.build();
-        }
-
-        inputLayer.updateRowRecordListEvictionUpperBound();
-      }
-
-      tsBlockBuilder.declarePositions(rowCount);
-      return tsBlockBuilder.build();
-    } catch (Exception e) {
-      LOGGER.error("FilterOperator#next()", e);
-      throw new RuntimeException(e);
-    }
-  }
-
-  private YieldableState skipDataPoint(LayerPointReader reader, long currentTime, int readerIndex)
-      throws IOException, QueryProcessException {
-    final YieldableState yieldableState = reader.yield();
-    if (yieldableState != YieldableState.YIELDABLE) {
-      return yieldableState;
-    }
-
-    if (reader.currentTime() == currentTime) {
-      shouldIterateReadersToNextValid[readerIndex] = true;
-    }
-
-    return YieldableState.YIELDABLE;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/Expression.java
index 3dc95b9485..eae9d7320a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/Expression.java
@@ -49,6 +49,8 @@ import org.apache.iotdb.db.mpp.plan.expression.unary.LogicNotExpression;
 import org.apache.iotdb.db.mpp.plan.expression.unary.NegationExpression;
 import org.apache.iotdb.db.mpp.plan.expression.unary.RegularExpression;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.input.QueryDataSetInputLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.IntermediateLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.memory.LayerMemoryAssigner;
@@ -168,6 +170,19 @@ public abstract class Expression {
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException;
 
+  public abstract boolean isMappable(TypeProvider typeProvider);
+
+  public abstract ColumnTransformer constructColumnTransformer(
+      UDTFContext udtfContext,
+      TypeProvider typeProvider,
+      List<LeafColumnTransformer> leafList,
+      Map<String, List<InputLocation>> inputLocations,
+      Map<Expression, ColumnTransformer> cache,
+      Map<Expression, ColumnTransformer> hasSeen,
+      List<ColumnTransformer> commonTransformerList,
+      List<TSDataType> inputDataTypes,
+      int originSize);
+
   /////////////////////////////////////////////////////////////////////////////////////////////////
   // isConstantOperand
   /////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/AdditionExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/AdditionExpression.java
index d4c80fe955..9aaaed7778 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/AdditionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/AdditionExpression.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.mpp.plan.expression.binary;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditionColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.ArithmeticAdditionTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.ArithmeticBinaryTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -37,6 +40,15 @@ public class AdditionExpression extends ArithmeticBinaryExpression {
     super(byteBuffer);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer,
+      ColumnTransformer rightColumnTransformer,
+      Type type) {
+    return new ArithmeticAdditionColumnTransformer(
+        type, leftColumnTransformer, rightColumnTransformer);
+  }
+
   @Override
   protected ArithmeticBinaryTransformer constructTransformer(
       LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
index 8f6a5e1528..3ba78df364 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
@@ -26,6 +26,9 @@ import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.IdentityColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.input.QueryDataSetInputLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.IntermediateLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.SingleInputColumnMultiReferenceIntermediateLayer;
@@ -37,6 +40,8 @@ import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
 import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+import org.apache.iotdb.tsfile.read.common.type.TypeFactory;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -316,6 +321,74 @@ public abstract class BinaryExpression extends Expression {
     return expressionIntermediateLayerMap.get(this);
   }
 
+  @Override
+  public boolean isMappable(TypeProvider typeProvider) {
+    return leftExpression.isMappable(typeProvider) && rightExpression.isMappable(typeProvider);
+  }
+
+  @Override
+  public ColumnTransformer constructColumnTransformer(
+      UDTFContext udtfContext,
+      TypeProvider typeProvider,
+      List<LeafColumnTransformer> leafList,
+      Map<String, List<InputLocation>> inputLocations,
+      Map<Expression, ColumnTransformer> cache,
+      Map<Expression, ColumnTransformer> hasSeen,
+      List<ColumnTransformer> commonTransformerList,
+      List<TSDataType> inputDataTypes,
+      int originSize) {
+    if (!cache.containsKey(this)) {
+      if (hasSeen.containsKey(this)) {
+        IdentityColumnTransformer identity =
+            new IdentityColumnTransformer(
+                TypeFactory.getType(typeProvider.getType(getExpressionString())),
+                originSize + commonTransformerList.size());
+        ColumnTransformer columnTransformer = hasSeen.get(this);
+        columnTransformer.addReferenceCount();
+        commonTransformerList.add(columnTransformer);
+        leafList.add(identity);
+        inputDataTypes.add(typeProvider.getType(getExpressionString()));
+        cache.put(this, identity);
+      } else {
+        ColumnTransformer leftColumnTransformer =
+            leftExpression.constructColumnTransformer(
+                udtfContext,
+                typeProvider,
+                leafList,
+                inputLocations,
+                cache,
+                hasSeen,
+                commonTransformerList,
+                inputDataTypes,
+                originSize);
+        ColumnTransformer rightColumnTransformer =
+            rightExpression.constructColumnTransformer(
+                udtfContext,
+                typeProvider,
+                leafList,
+                inputLocations,
+                cache,
+                hasSeen,
+                commonTransformerList,
+                inputDataTypes,
+                originSize);
+        cache.put(
+            this,
+            getConcreteBinaryColumnTransformer(
+                leftColumnTransformer,
+                rightColumnTransformer,
+                TypeFactory.getType(typeProvider.getType(getExpressionString()))));
+      }
+    }
+
+    ColumnTransformer res = cache.get(this);
+    res.addReferenceCount();
+    return res;
+  }
+
+  protected abstract ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer, ColumnTransformer rightColumnTransformer, Type type);
+
   protected abstract BinaryTransformer constructTransformer(
       LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/DivisionExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/DivisionExpression.java
index 26ecfdb071..3de8f65909 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/DivisionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/DivisionExpression.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.mpp.plan.expression.binary;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticDivisionColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.ArithmeticBinaryTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.ArithmeticDivisionTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -37,6 +40,15 @@ public class DivisionExpression extends ArithmeticBinaryExpression {
     super(byteBuffer);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer,
+      ColumnTransformer rightColumnTransformer,
+      Type type) {
+    return new ArithmeticDivisionColumnTransformer(
+        type, leftColumnTransformer, rightColumnTransformer);
+  }
+
   @Override
   protected ArithmeticBinaryTransformer constructTransformer(
       LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/EqualToExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/EqualToExpression.java
index a9a683ce87..8afacb03da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/EqualToExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/EqualToExpression.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.mpp.plan.expression.binary;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareEqualToColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.CompareBinaryTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.CompareEqualToTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -43,6 +46,14 @@ public class EqualToExpression extends CompareBinaryExpression {
     return new CompareEqualToTransformer(leftParentLayerPointReader, rightParentLayerPointReader);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer,
+      ColumnTransformer rightColumnTransformer,
+      Type type) {
+    return new CompareEqualToColumnTransformer(type, leftColumnTransformer, rightColumnTransformer);
+  }
+
   @Override
   protected String operator() {
     return "=";
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/GreaterEqualExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/GreaterEqualExpression.java
index d87a42046f..2b5a57e3f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/GreaterEqualExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/GreaterEqualExpression.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.mpp.plan.expression.binary;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareGreaterEqualColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.CompareBinaryTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.CompareGreaterEqualTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -44,6 +47,15 @@ public class GreaterEqualExpression extends CompareBinaryExpression {
         leftParentLayerPointReader, rightParentLayerPointReader);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer,
+      ColumnTransformer rightColumnTransformer,
+      Type type) {
+    return new CompareGreaterEqualColumnTransformer(
+        type, leftColumnTransformer, rightColumnTransformer);
+  }
+
   @Override
   protected String operator() {
     return ">=";
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/GreaterThanExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/GreaterThanExpression.java
index 848557695c..0406279b11 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/GreaterThanExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/GreaterThanExpression.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.mpp.plan.expression.binary;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareGreaterThanColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.CompareBinaryTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.CompareGreaterThanTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -44,6 +47,15 @@ public class GreaterThanExpression extends CompareBinaryExpression {
         leftParentLayerPointReader, rightParentLayerPointReader);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer,
+      ColumnTransformer rightColumnTransformer,
+      Type type) {
+    return new CompareGreaterThanColumnTransformer(
+        type, leftColumnTransformer, rightColumnTransformer);
+  }
+
   @Override
   protected String operator() {
     return ">";
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LessEqualExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LessEqualExpression.java
index 36c9e12294..53bec2b992 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LessEqualExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LessEqualExpression.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.mpp.plan.expression.binary;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.CompareBinaryTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.CompareLessEqualTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -43,6 +46,15 @@ public class LessEqualExpression extends CompareBinaryExpression {
     return new CompareLessEqualTransformer(leftParentLayerPointReader, rightParentLayerPointReader);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer,
+      ColumnTransformer rightColumnTransformer,
+      Type type) {
+    return new CompareLessEqualColumnTransformer(
+        type, leftColumnTransformer, rightColumnTransformer);
+  }
+
   @Override
   protected String operator() {
     return "<=";
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LessThanExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LessThanExpression.java
index 630b7a4edc..aedc252bda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LessThanExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LessThanExpression.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.mpp.plan.expression.binary;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessThanColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.CompareBinaryTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.CompareLessThanTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -43,6 +46,15 @@ public class LessThanExpression extends CompareBinaryExpression {
     return new CompareLessThanTransformer(leftParentLayerPointReader, rightParentLayerPointReader);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer,
+      ColumnTransformer rightColumnTransformer,
+      Type type) {
+    return new CompareLessThanColumnTransformer(
+        type, leftColumnTransformer, rightColumnTransformer);
+  }
+
   @Override
   protected String operator() {
     return "<";
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LogicAndExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LogicAndExpression.java
index 68addddc49..b55491776d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LogicAndExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LogicAndExpression.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.mpp.plan.expression.binary;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.LogicAndColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.LogicAndTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.LogicBinaryTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -43,6 +46,14 @@ public class LogicAndExpression extends LogicBinaryExpression {
     return new LogicAndTransformer(leftParentLayerPointReader, rightParentLayerPointReader);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer,
+      ColumnTransformer rightColumnTransformer,
+      Type type) {
+    return new LogicAndColumnTransformer(type, leftColumnTransformer, rightColumnTransformer);
+  }
+
   @Override
   protected String operator() {
     return "&";
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LogicOrExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LogicOrExpression.java
index 785263b210..8c091e5287 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LogicOrExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/LogicOrExpression.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.mpp.plan.expression.binary;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.LogicOrColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.LogicBinaryTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.LogicOrTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -43,6 +46,14 @@ public class LogicOrExpression extends LogicBinaryExpression {
     return new LogicOrTransformer(leftParentLayerPointReader, rightParentLayerPointReader);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer,
+      ColumnTransformer rightColumnTransformer,
+      Type type) {
+    return new LogicOrColumnTransformer(type, leftColumnTransformer, rightColumnTransformer);
+  }
+
   @Override
   protected String operator() {
     return "|";
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/ModuloExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/ModuloExpression.java
index 24f7ab6b40..b2faae3b75 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/ModuloExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/ModuloExpression.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.mpp.plan.expression.binary;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticModuloColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.ArithmeticBinaryTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.ArithmeticModuloTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -37,6 +40,15 @@ public class ModuloExpression extends ArithmeticBinaryExpression {
     super(byteBuffer);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer,
+      ColumnTransformer rightColumnTransformer,
+      Type type) {
+    return new ArithmeticModuloColumnTransformer(
+        type, leftColumnTransformer, rightColumnTransformer);
+  }
+
   @Override
   protected ArithmeticBinaryTransformer constructTransformer(
       LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/MultiplicationExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/MultiplicationExpression.java
index adc381612d..d1aa27e093 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/MultiplicationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/MultiplicationExpression.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.mpp.plan.expression.binary;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticMultiplicationColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.ArithmeticBinaryTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.ArithmeticMultiplicationTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -37,6 +40,15 @@ public class MultiplicationExpression extends ArithmeticBinaryExpression {
     super(byteBuffer);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer,
+      ColumnTransformer rightColumnTransformer,
+      Type type) {
+    return new ArithmeticMultiplicationColumnTransformer(
+        type, leftColumnTransformer, rightColumnTransformer);
+  }
+
   @Override
   protected ArithmeticBinaryTransformer constructTransformer(
       LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/NonEqualExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/NonEqualExpression.java
index 2a25f392f2..396cd1c0fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/NonEqualExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/NonEqualExpression.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.mpp.plan.expression.binary;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareNonEqualColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.CompareBinaryTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.CompareNonEqualTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -43,6 +46,15 @@ public class NonEqualExpression extends CompareBinaryExpression {
     return new CompareNonEqualTransformer(leftParentLayerPointReader, rightParentLayerPointReader);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer,
+      ColumnTransformer rightColumnTransformer,
+      Type type) {
+    return new CompareNonEqualColumnTransformer(
+        type, leftColumnTransformer, rightColumnTransformer);
+  }
+
   @Override
   protected String operator() {
     return "!=";
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/SubtractionExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/SubtractionExpression.java
index 946179f782..b9bfe65c17 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/SubtractionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/SubtractionExpression.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.mpp.plan.expression.binary;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticSubtractionColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.ArithmeticBinaryTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.binary.ArithmeticSubtractionTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -37,6 +40,15 @@ public class SubtractionExpression extends ArithmeticBinaryExpression {
     super(byteBuffer);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteBinaryColumnTransformer(
+      ColumnTransformer leftColumnTransformer,
+      ColumnTransformer rightColumnTransformer,
+      Type type) {
+    return new ArithmeticSubtractionColumnTransformer(
+        type, leftColumnTransformer, rightColumnTransformer);
+  }
+
   @Override
   protected ArithmeticBinaryTransformer constructTransformer(
       LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/ConstantOperand.java
index c8da18f69e..452e042240 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/ConstantOperand.java
@@ -25,13 +25,18 @@ import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.input.QueryDataSetInputLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.ConstantIntermediateLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.IntermediateLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.memory.LayerMemoryAssigner;
 import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.TypeFactory;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.commons.lang3.Validate;
@@ -154,6 +159,32 @@ public class ConstantOperand extends LeafOperand {
     return expressionIntermediateLayerMap.get(this);
   }
 
+  @Override
+  public ColumnTransformer constructColumnTransformer(
+      UDTFContext udtfContext,
+      TypeProvider typeProvider,
+      List<LeafColumnTransformer> leafList,
+      Map<String, List<InputLocation>> inputLocations,
+      Map<Expression, ColumnTransformer> cache,
+      Map<Expression, ColumnTransformer> hasSeen,
+      List<ColumnTransformer> commonTransformerList,
+      List<TSDataType> inputDataTypes,
+      int originSize) {
+    ColumnTransformer res =
+        cache.computeIfAbsent(
+            this,
+            e -> {
+              ConstantColumnTransformer columnTransformer =
+                  new ConstantColumnTransformer(
+                      TypeFactory.getType(typeProvider.getType(getExpressionString())),
+                      TransformUtils.transformConstantOperandToColumn(this));
+              leafList.add(columnTransformer);
+              return columnTransformer;
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
   @Override
   public String getExpressionStringInternal() {
     return valueString;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
index 9f4bc80438..ab36096756 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.plan.expression.leaf;
 
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
 
@@ -39,4 +40,9 @@ public abstract class LeafOperand extends Expression {
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
     // nothing to do
   }
+
+  @Override
+  public boolean isMappable(TypeProvider typeProvider) {
+    return true;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimeSeriesOperand.java
index 28963ad7f5..3ac07b4a84 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimeSeriesOperand.java
@@ -28,6 +28,9 @@ import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.IdentityColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.input.QueryDataSetInputLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.IntermediateLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.SingleInputColumnMultiReferenceIntermediateLayer;
@@ -36,6 +39,7 @@ import org.apache.iotdb.db.mpp.transformation.dag.memory.LayerMemoryAssigner;
 import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.TypeFactory;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -170,6 +174,32 @@ public class TimeSeriesOperand extends LeafOperand {
     return expressionIntermediateLayerMap.get(this);
   }
 
+  @Override
+  public ColumnTransformer constructColumnTransformer(
+      UDTFContext udtfContext,
+      TypeProvider typeProvider,
+      List<LeafColumnTransformer> leafList,
+      Map<String, List<InputLocation>> inputLocations,
+      Map<Expression, ColumnTransformer> cache,
+      Map<Expression, ColumnTransformer> hasSeen,
+      List<ColumnTransformer> commonTransformerList,
+      List<TSDataType> inputDataTypes,
+      int originSize) {
+    ColumnTransformer res =
+        cache.computeIfAbsent(
+            this,
+            e -> {
+              IdentityColumnTransformer identity =
+                  new IdentityColumnTransformer(
+                      TypeFactory.getType(typeProvider.getType(getExpressionString())),
+                      inputLocations.get(getExpressionString()).get(0).getValueColumnIndex());
+              leafList.add(identity);
+              return identity;
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
   @Override
   public String getExpressionStringInternal() {
     return path.isMeasurementAliasExists() ? path.getFullPathWithAlias() : path.getFullPath();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimestampOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimestampOperand.java
index 7adf1a541c..dc854b9a93 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimestampOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimestampOperand.java
@@ -27,6 +27,9 @@ import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.input.QueryDataSetInputLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.IntermediateLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.SingleInputColumnMultiReferenceIntermediateLayer;
@@ -35,6 +38,7 @@ import org.apache.iotdb.db.mpp.transformation.dag.memory.LayerMemoryAssigner;
 import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.TypeFactory;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -153,6 +157,31 @@ public class TimestampOperand extends LeafOperand {
     return expressionIntermediateLayerMap.get(this);
   }
 
+  @Override
+  public ColumnTransformer constructColumnTransformer(
+      UDTFContext udtfContext,
+      TypeProvider typeProvider,
+      List<LeafColumnTransformer> leafList,
+      Map<String, List<InputLocation>> inputLocations,
+      Map<Expression, ColumnTransformer> cache,
+      Map<Expression, ColumnTransformer> hasSeen,
+      List<ColumnTransformer> commonTransformerList,
+      List<TSDataType> inputDataTypes,
+      int originSize) {
+    ColumnTransformer res =
+        cache.computeIfAbsent(
+            this,
+            e -> {
+              TimeColumnTransformer timeColumnTransformer =
+                  new TimeColumnTransformer(
+                      TypeFactory.getType(typeProvider.getType(getExpressionString())));
+              leafList.add(timeColumnTransformer);
+              return timeColumnTransformer;
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
   @Override
   protected boolean isConstantOperandInternal() {
     return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/multi/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/multi/FunctionExpression.java
index dd6b4d5605..50c7c439e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/multi/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/multi/FunctionExpression.java
@@ -30,6 +30,10 @@ import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.IdentityColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.multi.MappableUDFColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.input.QueryDataSetInputLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.IntermediateLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.MultiInputColumnIntermediateLayer;
@@ -44,11 +48,12 @@ import org.apache.iotdb.db.mpp.transformation.dag.transformer.multi.UDFQueryTran
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.unary.TransparentTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
 import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFTypeInferrer;
+import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFInformationInferrer;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.TypeFactory;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy;
 
@@ -263,7 +268,7 @@ public class FunctionExpression extends Expression {
       if (!isBuiltInAggregationFunctionExpression()) {
         typeProvider.setType(
             expressionString,
-            new UDTFTypeInferrer(functionName)
+            new UDTFInformationInferrer(functionName)
                 .inferOutputType(
                     expressions.stream().map(Expression::toString).collect(Collectors.toList()),
                     expressions.stream()
@@ -316,6 +321,88 @@ public class FunctionExpression extends Expression {
     }
   }
 
+  @Override
+  public ColumnTransformer constructColumnTransformer(
+      UDTFContext udtfContext,
+      TypeProvider typeProvider,
+      List<LeafColumnTransformer> leafList,
+      Map<String, List<InputLocation>> inputLocations,
+      Map<Expression, ColumnTransformer> cache,
+      Map<Expression, ColumnTransformer> hasSeen,
+      List<ColumnTransformer> commonTransformerList,
+      List<TSDataType> inputDataTypes,
+      int originSize) {
+    if (!cache.containsKey(this)) {
+      if (hasSeen.containsKey(this)) {
+        IdentityColumnTransformer identity =
+            new IdentityColumnTransformer(
+                TypeFactory.getType(typeProvider.getType(getExpressionString())),
+                originSize + commonTransformerList.size());
+        ColumnTransformer columnTransformer = hasSeen.get(this);
+        columnTransformer.addReferenceCount();
+        commonTransformerList.add(columnTransformer);
+        inputDataTypes.add(typeProvider.getType(getExpressionString()));
+        leafList.add(identity);
+        cache.put(this, identity);
+      } else {
+        if (isBuiltInAggregationFunctionExpression) {
+          IdentityColumnTransformer identity =
+              new IdentityColumnTransformer(
+                  TypeFactory.getType(typeProvider.getType(getExpressionString())),
+                  inputLocations.get(getExpressionString()).get(0).getValueColumnIndex());
+          leafList.add(identity);
+          cache.put(this, identity);
+        } else {
+          ColumnTransformer[] inputColumnTransformers =
+              expressions.stream()
+                  .map(
+                      expression ->
+                          expression.constructColumnTransformer(
+                              udtfContext,
+                              typeProvider,
+                              leafList,
+                              inputLocations,
+                              cache,
+                              hasSeen,
+                              commonTransformerList,
+                              inputDataTypes,
+                              originSize))
+                  .toArray(ColumnTransformer[]::new);
+
+          TSDataType[] inputTransformerDataTypes =
+              expressions.stream()
+                  .map(expression -> expression.inferTypes(typeProvider))
+                  .toArray(TSDataType[]::new);
+
+          UDTFExecutor executor = udtfContext.getExecutorByFunctionExpression(this);
+
+          // Mappable UDF does not need PointCollector, so memoryBudget and queryId is not
+          // needed.
+          executor.beforeStart(
+              0,
+              0,
+              expressions.stream().map(Expression::toString).collect(Collectors.toList()),
+              expressions.stream()
+                  .map(f -> typeProvider.getType(f.toString()))
+                  .collect(Collectors.toList()),
+              functionAttributes);
+
+          cache.put(
+              this,
+              new MappableUDFColumnTransformer(
+                  TypeFactory.getType(typeProvider.getType(getExpressionString())),
+                  inputColumnTransformers,
+                  inputTransformerDataTypes,
+                  udtfContext.getExecutorByFunctionExpression(this)));
+        }
+      }
+    }
+
+    ColumnTransformer res = cache.get(this);
+    res.addReferenceCount();
+    return res;
+  }
+
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
@@ -423,6 +510,19 @@ public class FunctionExpression extends Expression {
     }
   }
 
+  @Override
+  public boolean isMappable(TypeProvider typeProvider) {
+    return new UDTFInformationInferrer(functionName)
+        .getAccessStrategy(
+            expressions.stream().map(Expression::toString).collect(Collectors.toList()),
+            expressions.stream()
+                .map(f -> typeProvider.getType(f.toString()))
+                .collect(Collectors.toList()),
+            functionAttributes)
+        .getAccessStrategyType()
+        .equals(AccessStrategy.AccessStrategyType.MAPPABLE_ROW_BY_ROW);
+  }
+
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ternary/BetweenExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ternary/BetweenExpression.java
index 294bf2edf9..6b71fcdc45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ternary/BetweenExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ternary/BetweenExpression.java
@@ -25,9 +25,13 @@ import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ternary.BetweenColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ternary.TernaryColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.ternary.BetweenTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.ternary.TernaryTransformer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
@@ -73,6 +77,20 @@ public class BetweenExpression extends TernaryExpression {
         isNotBetween);
   }
 
+  @Override
+  protected TernaryColumnTransformer getConcreteTernaryTransformer(
+      ColumnTransformer firstColumnTransformer,
+      ColumnTransformer secondColumnTransformer,
+      ColumnTransformer thirdColumnTransformer,
+      Type returnType) {
+    return new BetweenColumnTransformer(
+        returnType,
+        firstColumnTransformer,
+        secondColumnTransformer,
+        thirdColumnTransformer,
+        isNotBetween);
+  }
+
   @Override
   protected String operator() {
     return "between";
@@ -80,6 +98,13 @@ public class BetweenExpression extends TernaryExpression {
 
   @Override
   public TSDataType inferTypes(TypeProvider typeProvider) {
+    final String expressionString = toString();
+    if (!typeProvider.containsTypeInfoOf(expressionString)) {
+      firstExpression.inferTypes(typeProvider);
+      secondExpression.inferTypes(typeProvider);
+      thirdExpression.inferTypes(typeProvider);
+      typeProvider.setType(expressionString, TSDataType.BOOLEAN);
+    }
     return TSDataType.BOOLEAN;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ternary/TernaryExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ternary/TernaryExpression.java
index f1498ed804..780ee96851 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ternary/TernaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ternary/TernaryExpression.java
@@ -28,6 +28,10 @@ import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.IdentityColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ternary.TernaryColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.input.QueryDataSetInputLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.IntermediateLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.SingleInputColumnMultiReferenceIntermediateLayer;
@@ -39,6 +43,8 @@ import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
 import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+import org.apache.iotdb.tsfile.read.common.type.TypeFactory;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -161,6 +167,92 @@ public abstract class TernaryExpression extends Expression {
     thirdExpression.collectPaths(pathSet);
   }
 
+  @Override
+  public boolean isMappable(TypeProvider typeProvider) {
+    return firstExpression.isMappable(typeProvider)
+        && secondExpression.isMappable(typeProvider)
+        && thirdExpression.isMappable(typeProvider);
+  }
+
+  @Override
+  public ColumnTransformer constructColumnTransformer(
+      UDTFContext udtfContext,
+      TypeProvider typeProvider,
+      List<LeafColumnTransformer> leafList,
+      Map<String, List<InputLocation>> inputLocations,
+      Map<Expression, ColumnTransformer> cache,
+      Map<Expression, ColumnTransformer> hasSeen,
+      List<ColumnTransformer> commonTransformerList,
+      List<TSDataType> inputDataTypes,
+      int originSize) {
+    if (!cache.containsKey(this)) {
+      if (hasSeen.containsKey(this)) {
+        IdentityColumnTransformer identity =
+            new IdentityColumnTransformer(
+                TypeFactory.getType(typeProvider.getType(getExpressionString())),
+                originSize + commonTransformerList.size());
+        ColumnTransformer columnTransformer = hasSeen.get(this);
+        columnTransformer.addReferenceCount();
+        commonTransformerList.add(columnTransformer);
+        leafList.add(identity);
+        inputDataTypes.add(typeProvider.getType(getExpressionString()));
+        cache.put(this, identity);
+      } else {
+        ColumnTransformer firstColumnTransformer =
+            firstExpression.constructColumnTransformer(
+                udtfContext,
+                typeProvider,
+                leafList,
+                inputLocations,
+                cache,
+                hasSeen,
+                commonTransformerList,
+                inputDataTypes,
+                originSize);
+        ColumnTransformer secondColumnTransformer =
+            secondExpression.constructColumnTransformer(
+                udtfContext,
+                typeProvider,
+                leafList,
+                inputLocations,
+                cache,
+                hasSeen,
+                commonTransformerList,
+                inputDataTypes,
+                originSize);
+        ColumnTransformer thirdColumnTransformer =
+            thirdExpression.constructColumnTransformer(
+                udtfContext,
+                typeProvider,
+                leafList,
+                inputLocations,
+                cache,
+                hasSeen,
+                commonTransformerList,
+                inputDataTypes,
+                originSize);
+
+        cache.put(
+            this,
+            getConcreteTernaryTransformer(
+                firstColumnTransformer,
+                secondColumnTransformer,
+                thirdColumnTransformer,
+                TypeFactory.getType(typeProvider.getType(getExpressionString()))));
+      }
+    }
+
+    ColumnTransformer res = cache.get(this);
+    res.addReferenceCount();
+    return res;
+  }
+
+  protected abstract TernaryColumnTransformer getConcreteTernaryTransformer(
+      ColumnTransformer firstColumnTransformer,
+      ColumnTransformer secondColumnTransformer,
+      ColumnTransformer thirdColumnTransformer,
+      Type returnType);
+
   @Override
   public void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/InExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/InExpression.java
index 47aa0dd53e..5e97e28875 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/InExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/InExpression.java
@@ -26,9 +26,12 @@ import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.unary.InColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.Transformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.unary.InTransformer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
@@ -107,6 +110,12 @@ public class InExpression extends UnaryExpression {
     return new InTransformer(pointReader, isNotIn, values);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteUnaryColumnTransformer(
+      ColumnTransformer childColumnTransformer, Type returnType) {
+    return new InColumnTransformer(returnType, childColumnTransformer, isNotIn, values);
+  }
+
   @Override
   protected Expression constructExpression(Expression childExpression) {
     return new InExpression(childExpression, isNotIn, values);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/IsNullExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/IsNullExpression.java
index fc4d6584c9..537f1edc3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/IsNullExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/IsNullExpression.java
@@ -23,9 +23,12 @@ import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.unary.IsNullColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.Transformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.unary.IsNullTransformer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
@@ -51,6 +54,11 @@ public class IsNullExpression extends UnaryExpression {
 
   @Override
   public TSDataType inferTypes(TypeProvider typeProvider) {
+    final String expressionString = toString();
+    if (!typeProvider.containsTypeInfoOf(expressionString)) {
+      expression.inferTypes(typeProvider);
+      typeProvider.setType(expressionString, TSDataType.BOOLEAN);
+    }
     return TSDataType.BOOLEAN;
   }
 
@@ -69,6 +77,12 @@ public class IsNullExpression extends UnaryExpression {
     return new IsNullTransformer(pointReader, isNot);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteUnaryColumnTransformer(
+      ColumnTransformer childColumnTransformer, Type returnType) {
+    return new IsNullColumnTransformer(returnType, childColumnTransformer, isNot);
+  }
+
   @Override
   protected Expression constructExpression(Expression childExpression) {
     return new IsNullExpression(childExpression, isNot);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/LikeExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/LikeExpression.java
index d14b2c30a3..e86c484fb3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/LikeExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/LikeExpression.java
@@ -24,9 +24,12 @@ import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.unary.RegularColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.Transformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.unary.RegularTransformer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
@@ -146,6 +149,12 @@ public class LikeExpression extends UnaryExpression {
     return new RegularTransformer(pointReader, pattern);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteUnaryColumnTransformer(
+      ColumnTransformer childColumnTransformer, Type returnType) {
+    return new RegularColumnTransformer(returnType, childColumnTransformer, pattern);
+  }
+
   @Override
   protected Expression constructExpression(Expression childExpression) {
     return new LikeExpression(childExpression, patternString, pattern);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/LogicNotExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/LogicNotExpression.java
index 9a7191d497..3f4c91dace 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/LogicNotExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/LogicNotExpression.java
@@ -27,9 +27,12 @@ import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.unary.LogicNotColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.Transformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.unary.LogicNotTransformer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -48,6 +51,12 @@ public class LogicNotExpression extends UnaryExpression {
     return new LogicNotTransformer(pointReader);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteUnaryColumnTransformer(
+      ColumnTransformer childColumnTransformer, Type returnType) {
+    return new LogicNotColumnTransformer(returnType, childColumnTransformer);
+  }
+
   @Override
   protected Expression constructExpression(Expression childExpression) {
     return new LogicNotExpression(childExpression);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/NegationExpression.java
index 2b9cba69b8..306e8ddd76 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/NegationExpression.java
@@ -27,9 +27,12 @@ import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.unary.ArithmeticNegationColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.Transformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.unary.ArithmeticNegationTransformer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
 import java.nio.ByteBuffer;
 
@@ -48,6 +51,12 @@ public class NegationExpression extends UnaryExpression {
     return new ArithmeticNegationTransformer(pointReader);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteUnaryColumnTransformer(
+      ColumnTransformer childColumnTransformer, Type returnType) {
+    return new ArithmeticNegationColumnTransformer(returnType, childColumnTransformer);
+  }
+
   @Override
   protected Expression constructExpression(Expression childExpression) {
     return new NegationExpression(childExpression);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/RegularExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/RegularExpression.java
index aee30c1a3f..4dcb052055 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/RegularExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/RegularExpression.java
@@ -24,9 +24,12 @@ import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.unary.RegularColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.Transformer;
 import org.apache.iotdb.db.mpp.transformation.dag.transformer.unary.RegularTransformer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.commons.lang3.Validate;
@@ -72,6 +75,12 @@ public class RegularExpression extends UnaryExpression {
     return new RegularTransformer(pointReader, pattern);
   }
 
+  @Override
+  protected ColumnTransformer getConcreteUnaryColumnTransformer(
+      ColumnTransformer childColumnTransformer, Type returnType) {
+    return new RegularColumnTransformer(returnType, childColumnTransformer, pattern);
+  }
+
   @Override
   protected Expression constructExpression(Expression childExpression) {
     return new RegularExpression(childExpression, patternString, pattern);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/UnaryExpression.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/UnaryExpression.java
index 471bb1c22b..72aefade90 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/UnaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/unary/UnaryExpression.java
@@ -26,6 +26,9 @@ import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.IdentityColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.input.QueryDataSetInputLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.IntermediateLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.intermediate.SingleInputColumnMultiReferenceIntermediateLayer;
@@ -37,6 +40,8 @@ import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+import org.apache.iotdb.tsfile.read.common.type.TypeFactory;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -190,6 +195,61 @@ public abstract class UnaryExpression extends Expression {
     return expressionIntermediateLayerMap.get(this);
   }
 
+  @Override
+  public boolean isMappable(TypeProvider typeProvider) {
+    return expression.isMappable(typeProvider);
+  }
+
+  @Override
+  public ColumnTransformer constructColumnTransformer(
+      UDTFContext udtfContext,
+      TypeProvider typeProvider,
+      List<LeafColumnTransformer> leafList,
+      Map<String, List<InputLocation>> inputLocations,
+      Map<Expression, ColumnTransformer> cache,
+      Map<Expression, ColumnTransformer> hasSeen,
+      List<ColumnTransformer> commonTransformerList,
+      List<TSDataType> inputDataTypes,
+      int originSize) {
+    if (!cache.containsKey(this)) {
+      if (hasSeen.containsKey(this)) {
+        IdentityColumnTransformer identity =
+            new IdentityColumnTransformer(
+                TypeFactory.getType(typeProvider.getType(getExpressionString())),
+                originSize + commonTransformerList.size());
+        ColumnTransformer columnTransformer = hasSeen.get(this);
+        columnTransformer.addReferenceCount();
+        commonTransformerList.add(columnTransformer);
+        leafList.add(identity);
+        inputDataTypes.add(typeProvider.getType(getExpressionString()));
+        cache.put(this, identity);
+      } else {
+        ColumnTransformer childColumnTransformer =
+            expression.constructColumnTransformer(
+                udtfContext,
+                typeProvider,
+                leafList,
+                inputLocations,
+                cache,
+                hasSeen,
+                commonTransformerList,
+                inputDataTypes,
+                originSize);
+        cache.put(
+            this,
+            getConcreteUnaryColumnTransformer(
+                childColumnTransformer,
+                TypeFactory.getType(typeProvider.getType(getExpressionString()))));
+      }
+    }
+    ColumnTransformer res = cache.get(this);
+    res.addReferenceCount();
+    return res;
+  }
+
+  protected abstract ColumnTransformer getConcreteUnaryColumnTransformer(
+      ColumnTransformer childColumnTransformer, Type returnType);
+
   protected abstract Transformer constructTransformer(LayerPointReader pointReader);
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 704d49c5ea..878d272a8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -46,7 +46,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.FilterOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
@@ -108,6 +108,7 @@ import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.timer.ITimeSliceAllocator;
 import org.apache.iotdb.db.mpp.execution.timer.RuleBasedTimeSliceAllocator;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -158,6 +159,9 @@ import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -166,6 +170,8 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.Gt;
 import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.Validate;
 
 import java.io.IOException;
@@ -825,25 +831,118 @@ public class LocalExecutionPlanner {
 
     @Override
     public Operator visitFilter(FilterNode node, LocalExecutionPlanContext context) {
+      final Expression filterExpression = node.getPredicate();
+      final TypeProvider typeProvider = context.getTypeProvider();
+
+      // check whether predicate contains Non-Mappable UDF
+      if (!filterExpression.isMappable(typeProvider)) {
+        throw new UnsupportedOperationException("Filter can not contain Non-Mappable UDF");
+      }
+
+      final Expression[] projectExpressions = node.getOutputExpressions();
+      final Operator inputOperator = generateOnlyChildOperator(node, context);
+      final Map<String, List<InputLocation>> inputLocations = makeLayout(node);
+      final List<TSDataType> inputDataTypes = getInputColumnTypes(node, context.getTypeProvider());
+      final List<TSDataType> filterOutputDataTypes = new ArrayList<>(inputDataTypes);
       final OperatorContext operatorContext =
           context.instanceContext.addOperatorContext(
               context.getNextOperatorId(),
               node.getPlanNodeId(),
-              FilterOperator.class.getSimpleName());
-      final Operator inputOperator = generateOnlyChildOperator(node, context);
-      final List<TSDataType> inputDataTypes = getInputColumnTypes(node, context.getTypeProvider());
-      final Map<String, List<InputLocation>> inputLocations = makeLayout(node);
-
+              FilterAndProjectOperator.class.getSimpleName());
       context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
+      boolean hasNonMappableUDF = false;
+      for (Expression expression : projectExpressions) {
+        if (!expression.isMappable(typeProvider)) {
+          hasNonMappableUDF = true;
+          break;
+        }
+      }
+
+      // init UDTFContext;
+      UDTFContext filterContext = new UDTFContext(node.getZoneId());
+      filterContext.constructUdfExecutors(new Expression[] {filterExpression});
+
+      // records LeafColumnTransformer of filter
+      List<LeafColumnTransformer> filterLeafColumnTransformerList = new ArrayList<>();
+
+      // records common ColumnTransformer between filter and project expressions
+      List<ColumnTransformer> commonTransformerList = new ArrayList<>();
+
+      // records LeafColumnTransformer of project expressions
+      List<LeafColumnTransformer> projectLeafColumnTransformerList = new ArrayList<>();
+
+      // records subexpression -> ColumnTransformer for filter
+      Map<Expression, ColumnTransformer> filterExpressionColumnTransformerMap = new HashMap<>();
+
+      ColumnTransformer filterOutputTransformer =
+          filterExpression.constructColumnTransformer(
+              filterContext,
+              typeProvider,
+              filterLeafColumnTransformerList,
+              inputLocations,
+              filterExpressionColumnTransformerMap,
+              ImmutableMap.of(),
+              ImmutableList.of(),
+              ImmutableList.of(),
+              0);
+
+      List<ColumnTransformer> projectOutputTransformerList = new ArrayList<>();
+
+      Map<Expression, ColumnTransformer> projectExpressionColumnTransformerMap = new HashMap<>();
+
+      // init project transformer when project expressions are all mappable
+      if (!hasNonMappableUDF) {
+        // init project UDTFContext
+        UDTFContext projectContext = new UDTFContext(node.getZoneId());
+        projectContext.constructUdfExecutors(projectExpressions);
+
+        for (Expression expression : projectExpressions) {
+          projectOutputTransformerList.add(
+              expression.constructColumnTransformer(
+                  projectContext,
+                  typeProvider,
+                  projectLeafColumnTransformerList,
+                  inputLocations,
+                  projectExpressionColumnTransformerMap,
+                  filterExpressionColumnTransformerMap,
+                  commonTransformerList,
+                  filterOutputDataTypes,
+                  inputLocations.size()));
+        }
+      }
+
+      Operator filter =
+          new FilterAndProjectOperator(
+              operatorContext,
+              inputOperator,
+              filterOutputDataTypes,
+              filterLeafColumnTransformerList,
+              filterOutputTransformer,
+              commonTransformerList,
+              projectLeafColumnTransformerList,
+              projectOutputTransformerList,
+              hasNonMappableUDF);
+
+      // Project expressions don't contain Non-Mappable UDF, TransformOperator is not needed
+      if (!hasNonMappableUDF) {
+        return filter;
+      }
+
+      // has Non-Mappable UDF, we wrap a TransformOperator for further calculation
       try {
-        return new FilterOperator(
-            operatorContext,
-            inputOperator,
+        final OperatorContext transformContext =
+            context.instanceContext.addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                TransformOperator.class.getSimpleName());
+        context.getTimeSliceAllocator().recordExecutionWeight(transformContext, 1);
+        return new TransformOperator(
+            transformContext,
+            filter,
             inputDataTypes,
             inputLocations,
-            node.getPredicate(),
-            node.getOutputExpressions(),
+            projectExpressions,
             node.isKeepNull(),
             node.getZoneId(),
             context.getTypeProvider(),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ColumnCache.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ColumnCache.java
new file mode 100644
index 0000000000..2286f28a5d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ColumnCache.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class ColumnCache {
+
+  private int referenceCount;
+  private Column column;
+
+  public ColumnCache() {}
+
+  public Column getColumn() {
+    referenceCount--;
+    checkArgument(referenceCount >= 0, "Exceed max call times of getColumn");
+    Column res = this.column;
+    // set column to null for memory control
+    if (referenceCount == 0) {
+      this.column = null;
+    }
+    return res;
+  }
+
+  public int getPositionCount() {
+    return column != null ? column.getPositionCount() : 0;
+  }
+
+  public void cacheColumn(Column column, int referenceCount) {
+    this.referenceCount = referenceCount;
+    this.column = column;
+  }
+
+  public boolean hasCached() {
+    return column != null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ColumnTransformer.java
new file mode 100644
index 0000000000..d830c54d22
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ColumnTransformer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
+
+public abstract class ColumnTransformer {
+
+  protected final Type returnType;
+
+  protected final ColumnCache columnCache;
+
+  protected int referenceCount;
+
+  public ColumnTransformer(Type returnType) {
+    this.returnType = returnType;
+    this.columnCache = new ColumnCache();
+    referenceCount = 0;
+  }
+
+  public void tryEvaluate() {
+    if (!columnCache.hasCached()) {
+      evaluate();
+    }
+  }
+
+  public Column getColumn() {
+    return columnCache.getColumn();
+  }
+
+  public void addReferenceCount() {
+    referenceCount++;
+  }
+
+  public void initializeColumnCache(Column column) {
+    columnCache.cacheColumn(column, referenceCount);
+  }
+
+  public int getColumnCachePositionCount() {
+    return columnCache.getPositionCount();
+  }
+
+  public Type getType() {
+    return returnType;
+  }
+
+  public boolean isReturnTypeNumeric() {
+    TypeEnum typeEnum = returnType.getTypeEnum();
+    return typeEnum.equals(TypeEnum.INT32)
+        || typeEnum.equals(TypeEnum.INT64)
+        || typeEnum.equals(TypeEnum.FLOAT)
+        || typeEnum.equals(TypeEnum.DOUBLE);
+  }
+
+  /** Responsible for the calculation */
+  protected abstract void evaluate();
+
+  protected abstract void checkType();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticAdditionColumnTransformer.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticAdditionColumnTransformer.java
index 9f4bc80438..8eaf106f31 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticAdditionColumnTransformer.java
@@ -17,26 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
-
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+public class ArithmeticAdditionColumnTransformer extends ArithmeticBinaryColumnTransformer {
+  public ArithmeticAdditionColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  protected double transform(double d1, double d2) {
+    return d1 + d2;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticBinaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticBinaryColumnTransformer.java
new file mode 100644
index 0000000000..cdcd7ec8b0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticBinaryColumnTransformer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+
+public abstract class ArithmeticBinaryColumnTransformer extends BinaryColumnTransformer {
+  public ArithmeticBinaryColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
+  }
+
+  @Override
+  protected void doTransform(
+      Column leftColumn, Column rightColumn, ColumnBuilder builder, int positionCount) {
+    for (int i = 0; i < positionCount; i++) {
+      if (!leftColumn.isNull(i) && !rightColumn.isNull(i)) {
+        returnType.writeDouble(
+            builder,
+            transform(
+                leftTransformer.getType().getDouble(leftColumn, i),
+                rightTransformer.getType().getDouble(rightColumn, i)));
+      } else {
+        builder.appendNull();
+      }
+    }
+  }
+
+  @Override
+  protected void checkType() {
+    if (!leftTransformer.isReturnTypeNumeric() || !rightTransformer.isReturnTypeNumeric()) {
+      throw new UnsupportedOperationException("Unsupported Type");
+    }
+  }
+
+  protected abstract double transform(double d1, double d2);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticDivisionColumnTransformer.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticDivisionColumnTransformer.java
index 9f4bc80438..9310d2ff5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticDivisionColumnTransformer.java
@@ -17,26 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
-
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+public class ArithmeticDivisionColumnTransformer extends ArithmeticBinaryColumnTransformer {
+  public ArithmeticDivisionColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  protected double transform(double d1, double d2) {
+    return d1 / d2;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticModuloColumnTransformer.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticModuloColumnTransformer.java
index 9f4bc80438..ca923595da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticModuloColumnTransformer.java
@@ -17,26 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
-
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+public class ArithmeticModuloColumnTransformer extends ArithmeticBinaryColumnTransformer {
+  public ArithmeticModuloColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  protected double transform(double d1, double d2) {
+    return d1 % d2;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticMultiplicationColumnTransformer.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticMultiplicationColumnTransformer.java
index 9f4bc80438..483b3096f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticMultiplicationColumnTransformer.java
@@ -17,26 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+public class ArithmeticMultiplicationColumnTransformer extends ArithmeticBinaryColumnTransformer {
 
-public abstract class LeafOperand extends Expression {
-
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+  public ArithmeticMultiplicationColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  protected double transform(double d1, double d2) {
+    return d1 * d2;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticSubtractionColumnTransformer.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticSubtractionColumnTransformer.java
index 9f4bc80438..bd694ccbff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/ArithmeticSubtractionColumnTransformer.java
@@ -17,26 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
-
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+public class ArithmeticSubtractionColumnTransformer extends ArithmeticBinaryColumnTransformer {
+  public ArithmeticSubtractionColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  protected double transform(double d1, double d2) {
+    return d1 - d2;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/BinaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/BinaryColumnTransformer.java
new file mode 100644
index 0000000000..76810d6311
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/BinaryColumnTransformer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+
+public abstract class BinaryColumnTransformer extends ColumnTransformer {
+
+  protected final ColumnTransformer leftTransformer;
+
+  protected final ColumnTransformer rightTransformer;
+
+  public BinaryColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType);
+    this.leftTransformer = leftTransformer;
+    this.rightTransformer = rightTransformer;
+    checkType();
+  }
+
+  @Override
+  public void evaluate() {
+    leftTransformer.tryEvaluate();
+    rightTransformer.tryEvaluate();
+    // attention: get positionCount before calling getColumn
+    int positionCount = leftTransformer.getColumnCachePositionCount();
+    Column leftColumn = leftTransformer.getColumn();
+    Column rightColumn = rightTransformer.getColumn();
+
+    ColumnBuilder builder = returnType.createColumnBuilder(positionCount);
+    doTransform(leftColumn, rightColumn, builder, positionCount);
+    initializeColumnCache(builder.build());
+  }
+
+  protected abstract void doTransform(
+      Column leftColumn, Column rightColumn, ColumnBuilder builder, int positionCount);
+
+  public ColumnTransformer getLeftTransformer() {
+    return leftTransformer;
+  }
+
+  public ColumnTransformer getRightTransformer() {
+    return rightTransformer;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareBinaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareBinaryColumnTransformer.java
new file mode 100644
index 0000000000..f184289e3b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareBinaryColumnTransformer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
+
+public abstract class CompareBinaryColumnTransformer extends BinaryColumnTransformer {
+
+  public CompareBinaryColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
+  }
+
+  @Override
+  protected void doTransform(
+      Column leftColumn, Column rightColumn, ColumnBuilder builder, int positionCount) {
+    for (int i = 0; i < positionCount; i++) {
+      if (!leftColumn.isNull(i) && !rightColumn.isNull(i)) {
+        boolean flag;
+        // compare binary type
+        if (leftTransformer.getType().getTypeEnum().equals(TypeEnum.BINARY)) {
+          flag =
+              transform(
+                  TransformUtils.compare(
+                      leftTransformer.getType().getBinary(leftColumn, i).getStringValue(),
+                      rightTransformer.getType().getBinary(rightColumn, i).getStringValue()));
+        } else if (leftTransformer.getType().getTypeEnum().equals(TypeEnum.BOOLEAN)) {
+          flag =
+              transform(
+                  Boolean.compare(
+                      leftTransformer.getType().getBoolean(leftColumn, i),
+                      rightTransformer.getType().getBoolean(rightColumn, i)));
+        } else {
+          flag =
+              transform(
+                  compare(
+                      leftTransformer.getType().getDouble(leftColumn, i),
+                      rightTransformer.getType().getDouble(rightColumn, i)));
+        }
+        returnType.writeBoolean(builder, flag);
+      } else {
+        builder.appendNull();
+      }
+    }
+  }
+
+  @Override
+  protected void checkType() {
+    // Boolean type can only be compared by == or !=
+    if (leftTransformer.getType().getTypeEnum().equals(TypeEnum.BOOLEAN)
+        || rightTransformer.getType().getTypeEnum().equals(TypeEnum.BOOLEAN)) {
+      throw new UnsupportedOperationException("Unsupported Type");
+    }
+  }
+
+  protected int compare(double d1, double d2) {
+    return Double.compare(d1, d2);
+  }
+
+  /**
+   * transform int value of flag to corresponding boolean value
+   *
+   * @param flag
+   * @return
+   */
+  protected abstract boolean transform(int flag);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareEqualToColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareEqualToColumnTransformer.java
new file mode 100644
index 0000000000..3c91fd6c2a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareEqualToColumnTransformer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+
+public class CompareEqualToColumnTransformer extends CompareBinaryColumnTransformer {
+  public CompareEqualToColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
+  }
+
+  @Override
+  protected final void checkType() {
+    if (leftTransformer.getType().getTypeEnum().equals(rightTransformer.getType().getTypeEnum())) {
+      return;
+    }
+
+    // Boolean type and Binary Type can not be compared with other types
+    if (!leftTransformer.isReturnTypeNumeric() || !rightTransformer.isReturnTypeNumeric()) {
+      throw new UnsupportedOperationException("Unsupported Type");
+    }
+  }
+
+  @Override
+  protected boolean transform(int flag) {
+    return flag == 0;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareGreaterEqualColumnTransformer.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareGreaterEqualColumnTransformer.java
index 9f4bc80438..807a070bcc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareGreaterEqualColumnTransformer.java
@@ -17,26 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
-
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+public class CompareGreaterEqualColumnTransformer extends CompareBinaryColumnTransformer {
+  public CompareGreaterEqualColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  protected boolean transform(int flag) {
+    return flag >= 0;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareGreaterThanColumnTransformer.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareGreaterThanColumnTransformer.java
index 9f4bc80438..c37fab6d98 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareGreaterThanColumnTransformer.java
@@ -17,26 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
-
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+public class CompareGreaterThanColumnTransformer extends CompareBinaryColumnTransformer {
+  public CompareGreaterThanColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  protected boolean transform(int flag) {
+    return flag > 0;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareLessEqualColumnTransformer.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareLessEqualColumnTransformer.java
index 9f4bc80438..63d1b152d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareLessEqualColumnTransformer.java
@@ -17,26 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
-
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+public class CompareLessEqualColumnTransformer extends CompareBinaryColumnTransformer {
+  public CompareLessEqualColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  protected boolean transform(int flag) {
+    return flag <= 0;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareLessThanColumnTransformer.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareLessThanColumnTransformer.java
index 9f4bc80438..4fc6211ddd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareLessThanColumnTransformer.java
@@ -17,26 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
-
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+public class CompareLessThanColumnTransformer extends CompareBinaryColumnTransformer {
+  public CompareLessThanColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  protected boolean transform(int flag) {
+    return flag < 0;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareNonEqualColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareNonEqualColumnTransformer.java
new file mode 100644
index 0000000000..4c58760bce
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/CompareNonEqualColumnTransformer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+
+public class CompareNonEqualColumnTransformer extends CompareBinaryColumnTransformer {
+  public CompareNonEqualColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
+  }
+
+  @Override
+  protected final void checkType() {
+    if (leftTransformer.getType().getTypeEnum().equals(rightTransformer.getType().getTypeEnum())) {
+      return;
+    }
+
+    // Boolean type and Binary Type can not be compared with other types
+    if (!leftTransformer.isReturnTypeNumeric() || !rightTransformer.isReturnTypeNumeric()) {
+      throw new UnsupportedOperationException("Unsupported Type");
+    }
+  }
+
+  @Override
+  protected boolean transform(int flag) {
+    return flag != 0;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/LogicAndColumnTransformer.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/LogicAndColumnTransformer.java
index 9f4bc80438..c64725522e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/LogicAndColumnTransformer.java
@@ -17,26 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
-
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+public class LogicAndColumnTransformer extends LogicBinaryColumnTransformer {
+  public LogicAndColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  protected boolean transform(boolean left, boolean right) {
+    return left && right;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/LogicBinaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/LogicBinaryColumnTransformer.java
new file mode 100644
index 0000000000..c6edc42c20
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/LogicBinaryColumnTransformer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
+
+public abstract class LogicBinaryColumnTransformer extends BinaryColumnTransformer {
+  public LogicBinaryColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
+  }
+
+  @Override
+  protected void doTransform(
+      Column leftColumn, Column rightColumn, ColumnBuilder builder, int positionCount) {
+    for (int i = 0; i < positionCount; i++) {
+      if (!leftColumn.isNull(i) && !rightColumn.isNull(i)) {
+        returnType.writeBoolean(
+            builder,
+            transform(
+                leftTransformer.getType().getBoolean(leftColumn, i),
+                rightTransformer.getType().getBoolean(rightColumn, i)));
+      } else if (!leftColumn.isNull(i)) {
+        returnType.writeBoolean(
+            builder, transform(leftTransformer.getType().getBoolean(leftColumn, i), false));
+      } else if (!rightColumn.isNull(i)) {
+        returnType.writeBoolean(
+            builder, transform(false, rightTransformer.getType().getBoolean(rightColumn, i)));
+      } else {
+        builder.appendNull();
+      }
+    }
+  }
+
+  @Override
+  protected void checkType() {
+    if (!leftTransformer.getType().getTypeEnum().equals(TypeEnum.BOOLEAN)
+        || !rightTransformer.getType().getTypeEnum().equals(TypeEnum.BOOLEAN)) {
+      throw new UnsupportedOperationException("Unsupported Type");
+    }
+  }
+
+  protected abstract boolean transform(boolean left, boolean right);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/LogicOrColumnTransformer.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/LogicOrColumnTransformer.java
index 9f4bc80438..135a8ff348 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/binary/LogicOrColumnTransformer.java
@@ -17,26 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.binary;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
-
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+public class LogicOrColumnTransformer extends LogicBinaryColumnTransformer {
+  public LogicOrColumnTransformer(
+      Type returnType, ColumnTransformer leftTransformer, ColumnTransformer rightTransformer) {
+    super(returnType, leftTransformer, rightTransformer);
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  protected boolean transform(boolean left, boolean right) {
+    return left || right;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/leaf/ConstantColumnTransformer.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/leaf/ConstantColumnTransformer.java
index 9f4bc80438..0a6c10b1b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/leaf/ConstantColumnTransformer.java
@@ -17,26 +17,24 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.leaf;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+public class ConstantColumnTransformer extends LeafColumnTransformer {
 
-public abstract class LeafOperand extends Expression {
+  private final Column value;
 
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+  public ConstantColumnTransformer(Type returnType, Column value) {
+    super(returnType);
+    this.value = value;
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  public void initFromTsBlock(TsBlock input) {
+    initializeColumnCache(new RunLengthEncodedColumn(value, input.getPositionCount()));
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/leaf/IdentityColumnTransformer.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/leaf/IdentityColumnTransformer.java
index 9f4bc80438..4b419d290f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/leaf/IdentityColumnTransformer.java
@@ -17,26 +17,27 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.leaf;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
+/**
+ * this is a special transformer which outputs data just as input without any modification.
+ *
+ * <p>i.e. it's just the function f(x) = x.
+ */
+public class IdentityColumnTransformer extends LeafColumnTransformer {
+  // the index of value column
+  private final int inputIndex;
 
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+  public IdentityColumnTransformer(Type returnType, int inputIndex) {
+    super(returnType);
+    this.inputIndex = inputIndex;
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  public void initFromTsBlock(TsBlock input) {
+    initializeColumnCache(input.getColumn(inputIndex));
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/leaf/LeafColumnTransformer.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/leaf/LeafColumnTransformer.java
index 9f4bc80438..1b7cece4a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/leaf/LeafColumnTransformer.java
@@ -17,26 +17,26 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.leaf;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
+public abstract class LeafColumnTransformer extends ColumnTransformer {
+  public LeafColumnTransformer(Type returnType) {
+    super(returnType);
+  }
 
   @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+  public void evaluate() {
+    // do nothing
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  public void checkType() {
+    // do nothing
   }
+
+  public abstract void initFromTsBlock(TsBlock input);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/leaf/TimeColumnTransformer.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/leaf/TimeColumnTransformer.java
index 9f4bc80438..240c4d8e1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/leaf/TimeColumnTransformer.java
@@ -17,26 +17,18 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.leaf;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
-
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+public class TimeColumnTransformer extends LeafColumnTransformer {
+  public TimeColumnTransformer(Type returnType) {
+    super(returnType);
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  public void initFromTsBlock(TsBlock input) {
+    initializeColumnCache(input.getTimeColumn());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
new file mode 100644
index 0000000000..87cecc817a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.multi;
+
+import org.apache.iotdb.db.mpp.transformation.dag.adapter.ElasticSerializableRowRecordListBackedMultiColumnRow;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+
+public class MappableUDFColumnTransformer extends ColumnTransformer {
+
+  private final ColumnTransformer[] inputColumnTransformers;
+
+  private final UDTFExecutor executor;
+
+  private final TSDataType[] inputDataTypes;
+
+  public MappableUDFColumnTransformer(
+      Type returnType,
+      ColumnTransformer[] inputColumnTransformers,
+      TSDataType[] inputDataTypes,
+      UDTFExecutor executor) {
+    super(returnType);
+    this.inputColumnTransformers = inputColumnTransformers;
+    this.executor = executor;
+    this.inputDataTypes = inputDataTypes;
+  }
+
+  @Override
+  public void evaluate() {
+    for (ColumnTransformer inputColumnTransformer : inputColumnTransformers) {
+      inputColumnTransformer.tryEvaluate();
+    }
+    int size = inputColumnTransformers.length;
+    Column[] columns = new Column[size];
+    // attention: get positionCount before calling getColumn
+    int positionCount = inputColumnTransformers[0].getColumnCachePositionCount();
+    for (int i = 0; i < size; i++) {
+      columns[i] = inputColumnTransformers[i].getColumn();
+    }
+    ColumnBuilder columnBuilder = returnType.createColumnBuilder(positionCount);
+    for (int i = 0; i < positionCount; i++) {
+
+      Object[] values = new Object[size];
+      for (int j = 0; j < size; j++) {
+        if (columns[j].isNull(i)) {
+          values[j] = null;
+        } else {
+          values[j] = columns[j].getObject(i);
+        }
+      }
+      // construct input row for executor
+      ElasticSerializableRowRecordListBackedMultiColumnRow row =
+          new ElasticSerializableRowRecordListBackedMultiColumnRow(inputDataTypes);
+      row.setRowRecord(values);
+      executor.execute(row);
+      Object res = executor.getCurrentValue();
+      if (res != null) {
+        returnType.writeObject(columnBuilder, res);
+      } else {
+        columnBuilder.appendNull();
+      }
+    }
+    initializeColumnCache(columnBuilder.build());
+  }
+
+  @Override
+  protected void checkType() {
+    // do nothing
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/BetweenColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/BetweenColumnTransformer.java
new file mode 100644
index 0000000000..c166943e15
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/BetweenColumnTransformer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.ternary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.BinaryType;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+
+public class BetweenColumnTransformer extends CompareTernaryColumnTransformer {
+  private final boolean isNotBetween;
+
+  public BetweenColumnTransformer(
+      Type returnType,
+      ColumnTransformer firstColumnTransformer,
+      ColumnTransformer secondColumnTransformer,
+      ColumnTransformer thirdColumnTransformer,
+      boolean isNotBetween) {
+    super(returnType, firstColumnTransformer, secondColumnTransformer, thirdColumnTransformer);
+    this.isNotBetween = isNotBetween;
+  }
+
+  @Override
+  protected void doTransform(
+      Column firstColumn,
+      Column secondColumn,
+      Column thirdColumn,
+      ColumnBuilder builder,
+      int positionCount) {
+    for (int i = 0; i < positionCount; i++) {
+      if (!firstColumn.isNull(i) && !secondColumn.isNull(i) && !thirdColumn.isNull(i)) {
+        boolean flag;
+        if (firstColumnTransformer.getType() instanceof BinaryType) {
+          flag =
+              ((TransformUtils.compare(
+                              firstColumnTransformer
+                                  .getType()
+                                  .getBinary(firstColumn, i)
+                                  .getStringValue(),
+                              secondColumnTransformer
+                                  .getType()
+                                  .getBinary(secondColumn, i)
+                                  .getStringValue())
+                          >= 0)
+                      && (TransformUtils.compare(
+                              firstColumnTransformer
+                                  .getType()
+                                  .getBinary(firstColumn, i)
+                                  .getStringValue(),
+                              thirdColumnTransformer
+                                  .getType()
+                                  .getBinary(thirdColumn, i)
+                                  .getStringValue())
+                          <= 0))
+                  ^ isNotBetween;
+        } else {
+          flag =
+              ((Double.compare(
+                              firstColumnTransformer.getType().getDouble(firstColumn, i),
+                              secondColumnTransformer.getType().getDouble(secondColumn, i))
+                          >= 0)
+                      && (Double.compare(
+                              firstColumnTransformer.getType().getDouble(firstColumn, i),
+                              thirdColumnTransformer.getType().getDouble(thirdColumn, i))
+                          <= 0))
+                  ^ isNotBetween;
+        }
+        returnType.writeBoolean(builder, flag);
+      } else {
+        builder.appendNull();
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/CompareTernaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/CompareTernaryColumnTransformer.java
new file mode 100644
index 0000000000..41c1e1ddbf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/CompareTernaryColumnTransformer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.ternary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
+
+public abstract class CompareTernaryColumnTransformer extends TernaryColumnTransformer {
+  public CompareTernaryColumnTransformer(
+      Type returnType,
+      ColumnTransformer firstColumnTransformer,
+      ColumnTransformer secondColumnTransformer,
+      ColumnTransformer thirdColumnTransformer) {
+    super(returnType, firstColumnTransformer, secondColumnTransformer, thirdColumnTransformer);
+  }
+
+  @Override
+  public void evaluate() {
+    firstColumnTransformer.tryEvaluate();
+    secondColumnTransformer.tryEvaluate();
+    thirdColumnTransformer.tryEvaluate();
+    // attention: get positionCount before calling getColumn
+    int positionCount = firstColumnTransformer.getColumnCachePositionCount();
+    Column firstColumn = firstColumnTransformer.getColumn();
+    Column secondColumn = secondColumnTransformer.getColumn();
+    Column thirdColumn = thirdColumnTransformer.getColumn();
+    ColumnBuilder columnBuilder = returnType.createColumnBuilder(positionCount);
+    doTransform(firstColumn, secondColumn, thirdColumn, columnBuilder, positionCount);
+    initializeColumnCache(columnBuilder.build());
+  }
+
+  @Override
+  protected final void checkType() {
+    if ((firstColumnTransformer.getType().getTypeEnum())
+            .equals(secondColumnTransformer.getType().getTypeEnum())
+        && (firstColumnTransformer.getType().getTypeEnum())
+            .equals(thirdColumnTransformer.getType().getTypeEnum())) {
+      return;
+    }
+
+    if (firstColumnTransformer.getType().getTypeEnum().equals(TypeEnum.BOOLEAN)
+        || secondColumnTransformer.getType().getTypeEnum().equals(TypeEnum.BOOLEAN)
+        || thirdColumnTransformer.getType().getTypeEnum().equals(TypeEnum.BOOLEAN)) {
+      throw new UnsupportedOperationException("Unsupported Type");
+    }
+  }
+
+  protected abstract void doTransform(
+      Column firstColumn,
+      Column secondColumn,
+      Column thirdColumn,
+      ColumnBuilder builder,
+      int positionCount);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
new file mode 100644
index 0000000000..2032215b0f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.ternary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+
+public abstract class TernaryColumnTransformer extends ColumnTransformer {
+
+  protected ColumnTransformer firstColumnTransformer;
+
+  protected ColumnTransformer secondColumnTransformer;
+
+  protected ColumnTransformer thirdColumnTransformer;
+
+  public TernaryColumnTransformer(
+      Type returnType,
+      ColumnTransformer firstColumnTransformer,
+      ColumnTransformer secondColumnTransformer,
+      ColumnTransformer thirdColumnTransformer) {
+    super(returnType);
+    this.firstColumnTransformer = firstColumnTransformer;
+    this.secondColumnTransformer = secondColumnTransformer;
+    this.thirdColumnTransformer = thirdColumnTransformer;
+    checkType();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/ArithmeticNegationColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/ArithmeticNegationColumnTransformer.java
new file mode 100644
index 0000000000..13bbfe393c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/ArithmeticNegationColumnTransformer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.unary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+
+public class ArithmeticNegationColumnTransformer extends UnaryColumnTransformer {
+  public ArithmeticNegationColumnTransformer(
+      Type returnType, ColumnTransformer childColumnTransformer) {
+    super(returnType, childColumnTransformer);
+  }
+
+  @Override
+  protected void doTransform(Column column, ColumnBuilder columnBuilder) {
+    for (int i = 0, n = column.getPositionCount(); i < n; i++) {
+      if (!column.isNull(i)) {
+        returnType.writeDouble(
+            columnBuilder, -childColumnTransformer.getType().getDouble(column, i));
+      } else {
+        columnBuilder.appendNull();
+      }
+    }
+  }
+
+  @Override
+  protected final void checkType() {
+    if (!childColumnTransformer.isReturnTypeNumeric()) {
+      throw new UnsupportedOperationException("Unsupported Type: " + returnType.toString());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/InColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/InColumnTransformer.java
new file mode 100644
index 0000000000..994c8c6c95
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/InColumnTransformer.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.unary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class InColumnTransformer extends UnaryColumnTransformer {
+  private final Satisfy satisfy;
+
+  private final TypeEnum childType;
+
+  private Set<Integer> intSet;
+  private Set<Long> longSet;
+  private Set<Float> floatSet;
+  private Set<Double> doubleSet;
+  private Set<Boolean> booleanSet;
+  private Set<String> stringSet;
+
+  public InColumnTransformer(
+      Type returnType,
+      ColumnTransformer childColumnTransformer,
+      boolean isNotIn,
+      Set<String> values) {
+    super(returnType, childColumnTransformer);
+    satisfy = isNotIn ? new NotInSatisfy() : new InSatisfy();
+    this.childType = childColumnTransformer.getType().getTypeEnum();
+    initTypedSet(values);
+  }
+
+  @Override
+  protected void doTransform(Column column, ColumnBuilder columnBuilder) {
+    for (int i = 0, n = column.getPositionCount(); i < n; i++) {
+      if (!column.isNull(i)) {
+        switch (childType) {
+          case INT32:
+            returnType.writeBoolean(columnBuilder, satisfy.of(column.getInt(i)));
+            break;
+          case INT64:
+            returnType.writeBoolean(columnBuilder, satisfy.of(column.getLong(i)));
+            break;
+          case FLOAT:
+            returnType.writeBoolean(columnBuilder, satisfy.of(column.getFloat(i)));
+            break;
+          case DOUBLE:
+            returnType.writeBoolean(columnBuilder, satisfy.of(column.getDouble(i)));
+            break;
+          case BOOLEAN:
+            returnType.writeBoolean(columnBuilder, satisfy.of(column.getBoolean(i)));
+            break;
+          case BINARY:
+            returnType.writeBoolean(
+                columnBuilder, satisfy.of(column.getBinary(i).getStringValue()));
+            break;
+          default:
+            throw new UnsupportedOperationException("unsupported data type: " + childType);
+        }
+      } else {
+        columnBuilder.appendNull();
+      }
+    }
+  }
+
+  private void initTypedSet(Set<String> values) {
+    switch (childType) {
+      case INT32:
+        intSet = new HashSet<>();
+        for (String value : values) {
+          intSet.add(Integer.valueOf(value));
+        }
+        break;
+      case INT64:
+        longSet = new HashSet<>();
+        for (String value : values) {
+          longSet.add(Long.valueOf(value));
+        }
+        break;
+      case FLOAT:
+        floatSet = new HashSet<>();
+        for (String value : values) {
+          floatSet.add(Float.valueOf(value));
+        }
+        break;
+      case DOUBLE:
+        doubleSet = new HashSet<>();
+        for (String value : values) {
+          doubleSet.add(Double.valueOf(value));
+        }
+        break;
+      case BOOLEAN:
+        booleanSet = new HashSet<>();
+        for (String value : values) {
+          booleanSet.add(Boolean.valueOf(value));
+        }
+        break;
+      case BINARY:
+        stringSet = values;
+        break;
+      default:
+        throw new UnsupportedOperationException("unsupported data type: " + childType);
+    }
+  }
+
+  private interface Satisfy {
+
+    boolean of(int intValue);
+
+    boolean of(long longValue);
+
+    boolean of(float floatValue);
+
+    boolean of(double doubleValue);
+
+    boolean of(boolean booleanValue);
+
+    boolean of(String stringValue);
+  }
+
+  private class InSatisfy implements Satisfy {
+
+    @Override
+    public boolean of(int intValue) {
+      return intSet.contains(intValue);
+    }
+
+    @Override
+    public boolean of(long longValue) {
+      return longSet.contains(longValue);
+    }
+
+    @Override
+    public boolean of(float floatValue) {
+      return floatSet.contains(floatValue);
+    }
+
+    @Override
+    public boolean of(double doubleValue) {
+      return doubleSet.contains(doubleValue);
+    }
+
+    @Override
+    public boolean of(boolean booleanValue) {
+      return booleanSet.contains(booleanValue);
+    }
+
+    @Override
+    public boolean of(String stringValue) {
+      return stringSet.contains(stringValue);
+    }
+  }
+
+  private class NotInSatisfy implements Satisfy {
+
+    @Override
+    public boolean of(int intValue) {
+      return !intSet.contains(intValue);
+    }
+
+    @Override
+    public boolean of(long longValue) {
+      return !longSet.contains(longValue);
+    }
+
+    @Override
+    public boolean of(float floatValue) {
+      return !floatSet.contains(floatValue);
+    }
+
+    @Override
+    public boolean of(double doubleValue) {
+      return !doubleSet.contains(doubleValue);
+    }
+
+    @Override
+    public boolean of(boolean booleanValue) {
+      return !booleanSet.contains(booleanValue);
+    }
+
+    @Override
+    public boolean of(String stringValue) {
+      return !stringSet.contains(stringValue);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/IsNullColumnTransformer.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/IsNullColumnTransformer.java
index 9f4bc80438..4b477c9f3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/IsNullColumnTransformer.java
@@ -17,26 +17,27 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.db.mpp.transformation.dag.column.unary;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+public class IsNullColumnTransformer extends UnaryColumnTransformer {
 
-public abstract class LeafOperand extends Expression {
+  private final boolean isNot;
 
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
+  public IsNullColumnTransformer(
+      Type returnType, ColumnTransformer childColumnTransformer, boolean isNot) {
+    super(returnType, childColumnTransformer);
+    this.isNot = isNot;
   }
 
   @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+  protected void doTransform(Column column, ColumnBuilder columnBuilder) {
+    for (int i = 0, n = column.getPositionCount(); i < n; i++) {
+      returnType.writeBoolean(columnBuilder, column.isNull(i) ^ isNot);
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/LogicNotColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/LogicNotColumnTransformer.java
new file mode 100644
index 0000000000..d96685f836
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/LogicNotColumnTransformer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.unary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
+
+public class LogicNotColumnTransformer extends UnaryColumnTransformer {
+  public LogicNotColumnTransformer(Type returnType, ColumnTransformer childColumnTransformer) {
+    super(returnType, childColumnTransformer);
+  }
+
+  @Override
+  protected void doTransform(Column column, ColumnBuilder columnBuilder) {
+    for (int i = 0, n = column.getPositionCount(); i < n; i++) {
+      if (!column.isNull(i)) {
+        returnType.writeBoolean(
+            columnBuilder, !childColumnTransformer.getType().getBoolean(column, i));
+      } else {
+        columnBuilder.appendNull();
+      }
+    }
+  }
+
+  @Override
+  protected void checkType() {
+    if (!(childColumnTransformer.getType().getTypeEnum().equals(TypeEnum.BOOLEAN))) {
+      throw new UnsupportedOperationException(
+          "Unsupported Type: " + childColumnTransformer.getType().getTypeEnum());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/RegularColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/RegularColumnTransformer.java
new file mode 100644
index 0000000000..5de3496612
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/RegularColumnTransformer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.unary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
+
+import java.util.regex.Pattern;
+
+public class RegularColumnTransformer extends UnaryColumnTransformer {
+  private final Pattern pattern;
+
+  public RegularColumnTransformer(
+      Type returnType, ColumnTransformer childColumnTransformer, Pattern pattern) {
+    super(returnType, childColumnTransformer);
+    this.pattern = pattern;
+  }
+
+  @Override
+  protected void doTransform(Column column, ColumnBuilder columnBuilder) {
+    for (int i = 0, n = column.getPositionCount(); i < n; i++) {
+      if (!column.isNull(i)) {
+        returnType.writeBoolean(
+            columnBuilder,
+            pattern
+                .matcher(childColumnTransformer.getType().getBinary(column, i).getStringValue())
+                .find());
+      } else {
+        columnBuilder.appendNull();
+      }
+    }
+  }
+
+  @Override
+  protected void checkType() {
+    if (!(childColumnTransformer.getType().getTypeEnum().equals(TypeEnum.BINARY))) {
+      throw new UnsupportedOperationException(
+          "Unsupported Type: " + childColumnTransformer.getType().getTypeEnum());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
new file mode 100644
index 0000000000..d84b538cc6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.column.unary;
+
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+
+public abstract class UnaryColumnTransformer extends ColumnTransformer {
+  protected ColumnTransformer childColumnTransformer;
+
+  public UnaryColumnTransformer(Type returnType, ColumnTransformer childColumnTransformer) {
+    super(returnType);
+    this.childColumnTransformer = childColumnTransformer;
+    checkType();
+  }
+
+  @Override
+  public void evaluate() {
+    childColumnTransformer.tryEvaluate();
+    Column column = childColumnTransformer.getColumn();
+    ColumnBuilder columnBuilder = returnType.createColumnBuilder(column.getPositionCount());
+    doTransform(column, columnBuilder);
+    initializeColumnCache(columnBuilder.build());
+  }
+
+  @Override
+  protected void checkType() {
+    // do nothing
+  }
+
+  protected abstract void doTransform(Column column, ColumnBuilder columnBuilder);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/Transformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/Transformer.java
index 3bd7f927dd..28fb5f9cae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/Transformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/Transformer.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.mpp.transformation.api.YieldableState;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 import java.io.IOException;
-import java.util.Objects;
 
 public abstract class Transformer implements LayerPointReader {
 
@@ -119,24 +118,4 @@ public abstract class Transformer implements LayerPointReader {
   public final boolean isCurrentNull() {
     return currentNull;
   }
-
-  protected static int compare(CharSequence cs1, CharSequence cs2) {
-    if (Objects.requireNonNull(cs1) == Objects.requireNonNull(cs2)) {
-      return 0;
-    }
-
-    if (cs1.getClass() == cs2.getClass() && cs1 instanceof Comparable) {
-      return ((Comparable<Object>) cs1).compareTo(cs2);
-    }
-
-    for (int i = 0, len = Math.min(cs1.length(), cs2.length()); i < len; i++) {
-      char a = cs1.charAt(i);
-      char b = cs2.charAt(i);
-      if (a != b) {
-        return a - b;
-      }
-    }
-
-    return cs1.length() - cs2.length();
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareBinaryTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareBinaryTransformer.java
index e81b4e96c4..da23623a1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareBinaryTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareBinaryTransformer.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.io.IOException;
-import java.util.Objects;
 
 public abstract class CompareBinaryTransformer extends BinaryTransformer {
 
@@ -51,26 +50,6 @@ public abstract class CompareBinaryTransformer extends BinaryTransformer {
 
   protected abstract Evaluator constructTextEvaluator();
 
-  protected static int compare(CharSequence cs1, CharSequence cs2) {
-    if (Objects.requireNonNull(cs1) == Objects.requireNonNull(cs2)) {
-      return 0;
-    }
-
-    if (cs1.getClass() == cs2.getClass() && cs1 instanceof Comparable) {
-      return ((Comparable<Object>) cs1).compareTo(cs2);
-    }
-
-    for (int i = 0, len = Math.min(cs1.length(), cs2.length()); i < len; i++) {
-      char a = cs1.charAt(i);
-      char b = cs2.charAt(i);
-      if (a != b) {
-        return a - b;
-      }
-    }
-
-    return cs1.length() - cs2.length();
-  }
-
   @Override
   protected final void checkType() {
     if (leftPointReaderDataType.equals(rightPointReaderDataType)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareEqualToTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareEqualToTransformer.java
index 4ea813e083..898ad6b19d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareEqualToTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareEqualToTransformer.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.transformation.dag.transformer.binary;
 
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
 
 public class CompareEqualToTransformer extends CompareBinaryTransformer {
 
@@ -40,7 +41,7 @@ public class CompareEqualToTransformer extends CompareBinaryTransformer {
   @Override
   protected Evaluator constructTextEvaluator() {
     return () ->
-        compare(
+        TransformUtils.compare(
                 leftPointReader.currentBinary().getStringValue(),
                 rightPointReader.currentBinary().getStringValue())
             == 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareGreaterEqualTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareGreaterEqualTransformer.java
index b5d53f0dc9..524d050c47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareGreaterEqualTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareGreaterEqualTransformer.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.transformation.dag.transformer.binary;
 
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
 
 public class CompareGreaterEqualTransformer extends CompareBinaryTransformer {
 
@@ -40,7 +41,7 @@ public class CompareGreaterEqualTransformer extends CompareBinaryTransformer {
   @Override
   protected Evaluator constructTextEvaluator() {
     return () ->
-        compare(
+        TransformUtils.compare(
                 leftPointReader.currentBinary().getStringValue(),
                 rightPointReader.currentBinary().getStringValue())
             >= 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareGreaterThanTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareGreaterThanTransformer.java
index 798df4a1b7..94451ecd47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareGreaterThanTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareGreaterThanTransformer.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.transformation.dag.transformer.binary;
 
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
 
 public class CompareGreaterThanTransformer extends CompareBinaryTransformer {
 
@@ -40,7 +41,7 @@ public class CompareGreaterThanTransformer extends CompareBinaryTransformer {
   @Override
   protected Evaluator constructTextEvaluator() {
     return () ->
-        compare(
+        TransformUtils.compare(
                 leftPointReader.currentBinary().getStringValue(),
                 rightPointReader.currentBinary().getStringValue())
             > 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareLessEqualTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareLessEqualTransformer.java
index 9b322c7297..488ddd24b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareLessEqualTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareLessEqualTransformer.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.transformation.dag.transformer.binary;
 
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
 
 public class CompareLessEqualTransformer extends CompareBinaryTransformer {
 
@@ -40,7 +41,7 @@ public class CompareLessEqualTransformer extends CompareBinaryTransformer {
   @Override
   protected Evaluator constructTextEvaluator() {
     return () ->
-        compare(
+        TransformUtils.compare(
                 leftPointReader.currentBinary().getStringValue(),
                 rightPointReader.currentBinary().getStringValue())
             <= 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareLessThanTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareLessThanTransformer.java
index 19a65ce8ef..181a8a55a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareLessThanTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareLessThanTransformer.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.transformation.dag.transformer.binary;
 
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
 
 public class CompareLessThanTransformer extends CompareBinaryTransformer {
 
@@ -40,7 +41,7 @@ public class CompareLessThanTransformer extends CompareBinaryTransformer {
   @Override
   protected Evaluator constructTextEvaluator() {
     return () ->
-        compare(
+        TransformUtils.compare(
                 leftPointReader.currentBinary().getStringValue(),
                 rightPointReader.currentBinary().getStringValue())
             < 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareNonEqualTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareNonEqualTransformer.java
index 65bed1face..442a6b5cca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareNonEqualTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/CompareNonEqualTransformer.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.transformation.dag.transformer.binary;
 
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 
 public class CompareNonEqualTransformer extends CompareBinaryTransformer {
@@ -42,7 +43,7 @@ public class CompareNonEqualTransformer extends CompareBinaryTransformer {
   @Override
   protected Evaluator constructTextEvaluator() {
     return () ->
-        compare(
+        TransformUtils.compare(
                 leftPointReader.currentBinary().getStringValue(),
                 rightPointReader.currentBinary().getStringValue())
             != 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/ternary/BetweenTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/ternary/BetweenTransformer.java
index 70f8ef0bba..892f8991ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/ternary/BetweenTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/ternary/BetweenTransformer.java
@@ -22,6 +22,7 @@
 package org.apache.iotdb.db.mpp.transformation.dag.transformer.ternary;
 
 import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
 
 public class BetweenTransformer extends CompareTernaryTransformer {
 
@@ -54,11 +55,11 @@ public class BetweenTransformer extends CompareTernaryTransformer {
   @Override
   protected Evaluator constructTextEvaluator() {
     return () ->
-        ((compare(
+        ((TransformUtils.compare(
                         firstPointReader.currentBinary().getStringValue(),
                         secondPointReader.currentBinary().getStringValue())
                     >= 0)
-                && (compare(
+                && (TransformUtils.compare(
                         firstPointReader.currentBinary().getStringValue(),
                         thirdPointReader.currentBinary().getStringValue())
                     <= 0))
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFExecutor.java
index c7b97203e9..8aecaebcf1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.udf.api.access.RowWindow;
 import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
 import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
 import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,6 +60,26 @@ public class UDTFExecutor {
       List<String> childExpressions,
       List<TSDataType> childExpressionDataTypes,
       Map<String, String> attributes) {
+    reflectAndValidateUDF(childExpressions, childExpressionDataTypes, attributes);
+    configurations.check();
+
+    // Mappable UDF does not need PointCollector
+    if (!AccessStrategy.AccessStrategyType.MAPPABLE_ROW_BY_ROW.equals(
+        configurations.getAccessStrategy().getAccessStrategyType())) {
+      collector =
+          ElasticSerializableTVList.newElasticSerializableTVList(
+              UDFDataTypeTransformer.transformToTsDataType(configurations.getOutputDataType()),
+              queryId,
+              collectorMemoryBudgetInMB,
+              1);
+    }
+  }
+
+  private void reflectAndValidateUDF(
+      List<String> childExpressions,
+      List<TSDataType> childExpressionDataTypes,
+      Map<String, String> attributes) {
+
     udtf = (UDTF) UDFRegistrationService.getInstance().reflect(functionName);
 
     final UDFParameters parameters =
@@ -78,14 +99,6 @@ public class UDTFExecutor {
     } catch (Exception e) {
       onError("beforeStart(UDFParameters, UDTFConfigurations)", e);
     }
-    configurations.check();
-
-    collector =
-        ElasticSerializableTVList.newElasticSerializableTVList(
-            UDFDataTypeTransformer.transformToTsDataType(configurations.getOutputDataType()),
-            queryId,
-            collectorMemoryBudgetInMB,
-            1);
   }
 
   public void execute(Row row, boolean isCurrentRowNull) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFTypeInferrer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFInformationInferrer.java
similarity index 53%
rename from server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFTypeInferrer.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFInformationInferrer.java
index c4e8c374e4..bf702ac938 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFTypeInferrer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/udf/UDTFInformationInferrer.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.udf.api.UDTF;
 import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
 import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
 import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,13 +36,13 @@ import java.time.ZoneId;
 import java.util.List;
 import java.util.Map;
 
-public class UDTFTypeInferrer {
+public class UDTFInformationInferrer {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(UDTFTypeInferrer.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(UDTFInformationInferrer.class);
 
   protected final String functionName;
 
-  public UDTFTypeInferrer(String functionName) {
+  public UDTFInformationInferrer(String functionName) {
     this.functionName = functionName;
   }
 
@@ -50,22 +51,9 @@ public class UDTFTypeInferrer {
       List<TSDataType> childExpressionDataTypes,
       Map<String, String> attributes) {
     try {
-      UDTF udtf = (UDTF) UDFRegistrationService.getInstance().reflect(functionName);
-
-      UDFParameters parameters =
-          new UDFParameters(
-              childExpressions,
-              UDFDataTypeTransformer.transformToUDFDataTypeList(childExpressionDataTypes),
-              attributes);
-      udtf.validate(new UDFParameterValidator(parameters));
-
-      // use ZoneId.systemDefault() because UDF's data type is ZoneId independent
-      UDTFConfigurations configurations = new UDTFConfigurations(ZoneId.systemDefault());
-      udtf.beforeStart(parameters, configurations);
-
-      udtf.beforeDestroy();
-
-      return UDFDataTypeTransformer.transformToTsDataType(configurations.getOutputDataType());
+      return UDFDataTypeTransformer.transformToTsDataType(
+          reflectAndGetConfigurations(childExpressions, childExpressionDataTypes, attributes)
+              .getOutputDataType());
     } catch (Exception e) {
       LOGGER.warn("Error occurred during inferring UDF data type", e);
       throw new SemanticException(
@@ -73,4 +61,42 @@ public class UDTFTypeInferrer {
               + e);
     }
   }
+
+  public AccessStrategy getAccessStrategy(
+      List<String> childExpressions,
+      List<TSDataType> childExpressionDataTypes,
+      Map<String, String> attributes) {
+    try {
+
+      return reflectAndGetConfigurations(childExpressions, childExpressionDataTypes, attributes)
+          .getAccessStrategy();
+    } catch (Exception e) {
+      LOGGER.warn("Error occurred during getting UDF access strategy", e);
+      throw new SemanticException(
+          String.format(
+                  "Error occurred during getting UDF access strategy: %s", System.lineSeparator())
+              + e);
+    }
+  }
+
+  private UDTFConfigurations reflectAndGetConfigurations(
+      List<String> childExpressions,
+      List<TSDataType> childExpressionDataTypes,
+      Map<String, String> attributes)
+      throws Exception {
+    UDTF udtf = (UDTF) UDFRegistrationService.getInstance().reflect(functionName);
+
+    UDFParameters parameters =
+        new UDFParameters(
+            childExpressions,
+            UDFDataTypeTransformer.transformToUDFDataTypeList(childExpressionDataTypes),
+            attributes);
+    udtf.validate(new UDFParameterValidator(parameters));
+
+    // use ZoneId.systemDefault() because UDF's data type is ZoneId independent
+    UDTFConfigurations configurations = new UDTFConfigurations(ZoneId.systemDefault());
+    udtf.beforeStart(parameters, configurations);
+    udtf.beforeDestroy();
+    return configurations;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/TransformUtils.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/TransformUtils.java
new file mode 100644
index 0000000000..fd426281ff
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/TransformUtils.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.util;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
+import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.apache.commons.lang3.Validate;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class TransformUtils {
+
+  public static int compare(CharSequence cs1, CharSequence cs2) {
+    if (Objects.requireNonNull(cs1) == Objects.requireNonNull(cs2)) {
+      return 0;
+    }
+
+    if (cs1.getClass() == cs2.getClass() && cs1 instanceof Comparable) {
+      return ((Comparable<Object>) cs1).compareTo(cs2);
+    }
+
+    for (int i = 0, len = Math.min(cs1.length(), cs2.length()); i < len; i++) {
+      char a = cs1.charAt(i);
+      char b = cs2.charAt(i);
+      if (a != b) {
+        return a - b;
+      }
+    }
+
+    return cs1.length() - cs2.length();
+  }
+
+  public static Column transformConstantOperandToColumn(ConstantOperand constantOperand) {
+    Validate.notNull(constantOperand);
+
+    try {
+      Object value =
+          CommonUtils.parseValue(
+              constantOperand.getDataType(), constantOperand.getExpressionString());
+      if (value == null) {
+        throw new UnsupportedOperationException(
+            "Invalid constant operand: " + constantOperand.getExpressionString());
+      }
+
+      switch (constantOperand.getDataType()) {
+        case INT32:
+          return new IntColumn(1, Optional.of(new boolean[] {false}), new int[] {(int) value});
+        case INT64:
+          return new LongColumn(1, Optional.of(new boolean[] {false}), new long[] {(long) value});
+        case FLOAT:
+          return new FloatColumn(
+              1, Optional.of(new boolean[] {false}), new float[] {(float) value});
+        case DOUBLE:
+          return new DoubleColumn(
+              1, Optional.of(new boolean[] {false}), new double[] {(double) value});
+        case TEXT:
+          return new BinaryColumn(
+              1, Optional.of(new boolean[] {false}), new Binary[] {(Binary) value});
+        case BOOLEAN:
+          return new BooleanColumn(
+              1, Optional.of(new boolean[] {false}), new boolean[] {(boolean) value});
+        default:
+          throw new UnSupportedDataTypeException(
+              "Unsupported type: " + constantOperand.getDataType());
+      }
+    } catch (QueryProcessException e) {
+      throw new UnsupportedOperationException(e);
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/BinaryType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/BinaryType.java
new file mode 100644
index 0000000000..d249a94b21
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/BinaryType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common.type;
+
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public class BinaryType implements Type {
+  private static final BinaryType INSTANCE = new BinaryType();
+
+  private BinaryType() {}
+
+  @Override
+  public Binary getBinary(Column c, int position) {
+    return c.getBinary(position);
+  }
+
+  @Override
+  public void writeBinary(ColumnBuilder builder, Binary value) {
+    builder.writeBinary(value);
+  }
+
+  @Override
+  public ColumnBuilder createColumnBuilder(int expectedEntries) {
+    return new BinaryColumnBuilder(null, expectedEntries);
+  }
+
+  @Override
+  public TypeEnum getTypeEnum() {
+    return TypeEnum.BINARY;
+  }
+
+  public static BinaryType getInstance() {
+    return INSTANCE;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/BooleanType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/BooleanType.java
new file mode 100644
index 0000000000..1f48a3fca1
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/BooleanType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common.type;
+
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+public class BooleanType implements Type {
+
+  private static final BooleanType INSTANCE = new BooleanType();
+
+  private BooleanType() {}
+
+  @Override
+  public boolean getBoolean(Column c, int position) {
+    return c.getBoolean(position);
+  }
+
+  @Override
+  public void writeBoolean(ColumnBuilder builder, boolean value) {
+    builder.writeBoolean(value);
+  }
+
+  @Override
+  public ColumnBuilder createColumnBuilder(int expectedEntries) {
+    return new BooleanColumnBuilder(null, expectedEntries);
+  }
+
+  @Override
+  public TypeEnum getTypeEnum() {
+    return TypeEnum.BOOLEAN;
+  }
+
+  public static BooleanType getInstance() {
+    return INSTANCE;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/DoubleType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/DoubleType.java
new file mode 100644
index 0000000000..eca8626186
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/DoubleType.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common.type;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+
+public class DoubleType implements Type {
+
+  private static final DoubleType INSTANCE = new DoubleType();
+
+  private DoubleType() {}
+
+  @Override
+  public int getInt(Column c, int position) {
+    return (int) c.getDouble(position);
+  }
+
+  @Override
+  public long getLong(Column c, int position) {
+    return (long) c.getDouble(position);
+  }
+
+  @Override
+  public float getFloat(Column c, int position) {
+    return (float) c.getDouble(position);
+  }
+
+  @Override
+  public double getDouble(Column c, int position) {
+    return c.getDouble(position);
+  }
+
+  @Override
+  public void writeInt(ColumnBuilder builder, int value) {
+    builder.writeDouble(value);
+  }
+
+  @Override
+  public void writeLong(ColumnBuilder builder, long value) {
+    builder.writeDouble(value);
+  }
+
+  @Override
+  public void writeFloat(ColumnBuilder builder, float value) {
+    builder.writeDouble(value);
+  }
+
+  @Override
+  public void writeDouble(ColumnBuilder builder, double value) {
+    builder.writeDouble(value);
+  }
+
+  @Override
+  public ColumnBuilder createColumnBuilder(int expectedEntries) {
+    return new DoubleColumnBuilder(null, expectedEntries);
+  }
+
+  @Override
+  public TypeEnum getTypeEnum() {
+    return TypeEnum.DOUBLE;
+  }
+
+  public static DoubleType getInstance() {
+    return INSTANCE;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/FloatType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/FloatType.java
new file mode 100644
index 0000000000..5f93fb53e7
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/FloatType.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common.type;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+
+public class FloatType implements Type {
+
+  private static final FloatType INSTANCE = new FloatType();
+
+  private FloatType() {}
+
+  @Override
+  public int getInt(Column c, int position) {
+    return (int) c.getFloat(position);
+  }
+
+  @Override
+  public long getLong(Column c, int position) {
+    return (long) c.getFloat(position);
+  }
+
+  @Override
+  public float getFloat(Column c, int position) {
+    return c.getFloat(position);
+  }
+
+  @Override
+  public double getDouble(Column c, int position) {
+    return c.getFloat(position);
+  }
+
+  @Override
+  public void writeInt(ColumnBuilder builder, int value) {
+    builder.writeFloat(value);
+  }
+
+  @Override
+  public void writeLong(ColumnBuilder builder, long value) {
+    builder.writeFloat(value);
+  }
+
+  @Override
+  public void writeFloat(ColumnBuilder builder, float value) {
+    builder.writeFloat(value);
+  }
+
+  @Override
+  public void writeDouble(ColumnBuilder builder, double value) {
+    builder.writeFloat((float) value);
+  }
+
+  @Override
+  public ColumnBuilder createColumnBuilder(int expectedEntries) {
+    return new FloatColumnBuilder(null, expectedEntries);
+  }
+
+  @Override
+  public TypeEnum getTypeEnum() {
+    return TypeEnum.FLOAT;
+  }
+
+  public static FloatType getInstance() {
+    return INSTANCE;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/IntType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/IntType.java
new file mode 100644
index 0000000000..0b90092ecb
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/IntType.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common.type;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+
+public class IntType implements Type {
+
+  private static final IntType INSTANCE = new IntType();
+
+  private IntType() {}
+
+  @Override
+  public int getInt(Column c, int position) {
+    return c.getInt(position);
+  }
+
+  @Override
+  public long getLong(Column c, int position) {
+    return c.getInt(position);
+  }
+
+  @Override
+  public float getFloat(Column c, int position) {
+    return c.getInt(position);
+  }
+
+  @Override
+  public double getDouble(Column c, int position) {
+    return c.getInt(position);
+  }
+
+  @Override
+  public void writeInt(ColumnBuilder builder, int value) {
+    builder.writeInt(value);
+  }
+
+  @Override
+  public void writeLong(ColumnBuilder builder, long value) {
+    builder.writeInt((int) value);
+  }
+
+  @Override
+  public void writeFloat(ColumnBuilder builder, float value) {
+    builder.writeInt((int) value);
+  }
+
+  @Override
+  public void writeDouble(ColumnBuilder builder, double value) {
+    builder.writeInt((int) value);
+  }
+
+  @Override
+  public ColumnBuilder createColumnBuilder(int expectedEntries) {
+    return new IntColumnBuilder(null, expectedEntries);
+  }
+
+  @Override
+  public TypeEnum getTypeEnum() {
+    return TypeEnum.INT32;
+  }
+
+  public static IntType getInstance() {
+    return INSTANCE;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/LongType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/LongType.java
new file mode 100644
index 0000000000..451bd0f26e
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/LongType.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common.type;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+
+public class LongType implements Type {
+
+  private static final LongType INSTANCE = new LongType();
+
+  private LongType() {}
+
+  @Override
+  public int getInt(Column c, int position) {
+    return (int) c.getLong(position);
+  }
+
+  @Override
+  public long getLong(Column c, int position) {
+    return c.getLong(position);
+  }
+
+  @Override
+  public float getFloat(Column c, int position) {
+    return c.getLong(position);
+  }
+
+  @Override
+  public double getDouble(Column c, int position) {
+    return c.getLong(position);
+  }
+
+  @Override
+  public void writeInt(ColumnBuilder builder, int value) {
+    builder.writeLong(value);
+  }
+
+  @Override
+  public void writeLong(ColumnBuilder builder, long value) {
+    builder.writeLong(value);
+  }
+
+  @Override
+  public void writeFloat(ColumnBuilder builder, float value) {
+    builder.writeLong((long) value);
+  }
+
+  @Override
+  public void writeDouble(ColumnBuilder builder, double value) {
+    builder.writeLong((long) value);
+  }
+
+  @Override
+  public ColumnBuilder createColumnBuilder(int expectedEntries) {
+    return new LongColumnBuilder(null, expectedEntries);
+  }
+
+  @Override
+  public TypeEnum getTypeEnum() {
+    return TypeEnum.INT64;
+  }
+
+  public static LongType getInstance() {
+    return INSTANCE;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/Type.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/Type.java
new file mode 100644
index 0000000000..4458cfa6d1
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/Type.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common.type;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public interface Type {
+
+  /** Gets a boolean at {@code position}. */
+  default boolean getBoolean(Column c, int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a little endian int at {@code position}. */
+  default int getInt(Column c, int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a little endian long at {@code position}. */
+  default long getLong(Column c, int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a float at {@code position}. */
+  default float getFloat(Column c, int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a double at {@code position}. */
+  default double getDouble(Column c, int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a Binary at {@code position}. */
+  default Binary getBinary(Column c, int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+  /** Gets a Object at {@code position}. */
+  default Object getObject(Column c, int position) {
+    return c.getObject(position);
+  }
+
+  /** Write a boolean to the current entry; */
+  default void writeBoolean(ColumnBuilder builder, boolean value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write an int to the current entry; */
+  default void writeInt(ColumnBuilder builder, int value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a long to the current entry; */
+  default void writeLong(ColumnBuilder builder, long value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a float to the current entry; */
+  default void writeFloat(ColumnBuilder builder, float value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a double to the current entry; */
+  default void writeDouble(ColumnBuilder builder, double value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a Binary to the current entry; */
+  default void writeBinary(ColumnBuilder builder, Binary value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a Object to the current entry; */
+  default void writeObject(ColumnBuilder builder, Object value) {
+    builder.writeObject(value);
+  }
+
+  /**
+   * Creates the preferred column builder for this type. This is the builder used to store values
+   * after an expression projection within the query.
+   */
+  ColumnBuilder createColumnBuilder(int expectedEntries);
+
+  TypeEnum getTypeEnum();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/TypeEnum.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/TypeEnum.java
index 9f4bc80438..83b91495ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/TypeEnum.java
@@ -17,26 +17,18 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.tsfile.read.common.type;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+public enum TypeEnum {
+  INT32,
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+  INT64,
 
-public abstract class LeafOperand extends Expression {
+  FLOAT,
 
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
-  }
+  DOUBLE,
 
-  @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
-  }
+  BOOLEAN,
+
+  BINARY
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/TypeFactory.java
similarity index 53%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/TypeFactory.java
index 9f4bc80438..d292465f55 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/LeafOperand.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/type/TypeFactory.java
@@ -17,26 +17,28 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.expression.leaf;
+package org.apache.iotdb.tsfile.read.common.type;
 
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFExecutor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-import java.time.ZoneId;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public abstract class LeafOperand extends Expression {
-
-  @Override
-  public final List<Expression> getExpressions() {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public final void constructUdfExecutors(
-      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
-    // nothing to do
+public class TypeFactory {
+  public static Type getType(TSDataType tsDataType) {
+    switch (tsDataType) {
+      case INT32:
+        return IntType.getInstance();
+      case INT64:
+        return LongType.getInstance();
+      case FLOAT:
+        return FloatType.getInstance();
+      case DOUBLE:
+        return DoubleType.getInstance();
+      case BOOLEAN:
+        return BooleanType.getInstance();
+      case TEXT:
+        return BinaryType.getInstance();
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Invalid TSDataType for TypeFactory: %s", tsDataType));
+    }
   }
 }