You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/06/04 02:54:13 UTC

[iotdb] branch master updated: [IOTDB-1400] Support arithmetic operations in SELECT clauses (#3288)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c4b801  [IOTDB-1400] Support arithmetic operations in SELECT clauses (#3288)
6c4b801 is described below

commit 6c4b8010b4a85077ac5e79c9c0f1932e3df41a42
Author: Steve Yurong Su (宇荣) <ro...@apache.org>
AuthorDate: Thu Jun 3 21:53:46 2021 -0500

    [IOTDB-1400] Support arithmetic operations in SELECT clauses (#3288)
    
    * ArithmeticBinaryTransformer and its subclasses
    
    * ArithmeticNegationTransformer
    
    * refactor UDTFDataSet
    
    * support + - * / %
    
    * improve code style
    
    * add IT
    
    * add docs
---
 .../DML-Data-Manipulation-Language.md              |  42 ++++
 .../DML-Data-Manipulation-Language.md              |  44 ++++
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |   2 +-
 .../db/qp/physical/crud/RawDataQueryPlan.java      |   2 +-
 .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java |  68 +++--
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |   4 +-
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java | 160 ++++++++----
 .../iotdb/db/query/expression/Expression.java      |   5 -
 .../query/expression/binary/BinaryExpression.java  |  18 +-
 .../query/expression/unary/FunctionExpression.java |  20 --
 ...inusExpression.java => NegationExpression.java} |  14 +-
 .../query/expression/unary/TimeSeriesOperand.java  |  10 -
 .../transformer/ArithmeticAdditionTransformer.java |  35 +++
 .../transformer/ArithmeticBinaryTransformer.java   | 110 +++++++++
 .../transformer/ArithmeticDivisionTransformer.java |  35 +++
 .../transformer/ArithmeticModuloTransformer.java   |  35 +++
 .../ArithmeticMultiplicationTransformer.java       |  35 +++
 .../transformer/ArithmeticNegationTransformer.java |  67 +++++
 .../ArithmeticSubtractionTransformer.java          |  35 +++
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   9 +-
 .../iotdb/db/integration/IoTDBArithmeticIT.java    | 275 +++++++++++++++++++++
 21 files changed, 899 insertions(+), 126 deletions(-)

diff --git a/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md b/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
index 65d6e64..4ded88e 100644
--- a/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
+++ b/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
@@ -389,6 +389,48 @@ It costs 0.014s
 
 Please refer to [UDF (User Defined Function)](../Advanced-Features/UDF-User-Defined-Function.md).
 
+#### Arithmetic query
+
+##### Unary arithmetic operators
+
+Supported operators: `+`, `-`
+
+Supported input data types: `INT32`, `INT64`, `FLOAT` and `DOUBLE`
+
+Output data type: consistent with the input data type
+
+##### Binary arithmetic operators
+
+Supported operators: `+`, `-`, `*`, `/`, `%`
+
+Supported input data types: `INT32`, `INT64`, `FLOAT` and `DOUBLE`
+
+Output data type: `DOUBLE`
+
+Note: Only when the left operand and the right operand under a certain timestamp are not  `null`, the binary arithmetic operation will have an output value.
+
+##### Example
+
+```sql
+select s1, - s1, s2, + s2, s1 + s2, s1 - s2, s1 * s2, s1 / s2, s1 % s2 from root.sg.d1
+```
+
+Result:
+
+```
++-----------------------------+-------------+--------------+-------------+-------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+|                         Time|root.sg.d1.s1|-root.sg.d1.s1|root.sg.d1.s2|root.sg.d1.s2|root.sg.d1.s1 + root.sg.d1.s2|root.sg.d1.s1 - root.sg.d1.s2|root.sg.d1.s1 * root.sg.d1.s2|root.sg.d1.s1 / root.sg.d1.s2|root.sg.d1.s1 % root.sg.d1.s2|
++-----------------------------+-------------+--------------+-------------+-------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+|1970-01-01T08:00:00.001+08:00|          1.0|          -1.0|          1.0|          1.0|                          2.0|                          0.0|                          1.0|                          1.0|                          0.0|
+|1970-01-01T08:00:00.002+08:00|          2.0|          -2.0|          2.0|          2.0|                          4.0|                          0.0|                          4.0|                          1.0|                          0.0|
+|1970-01-01T08:00:00.003+08:00|          3.0|          -3.0|          3.0|          3.0|                          6.0|                          0.0|                          9.0|                          1.0|                          0.0|
+|1970-01-01T08:00:00.004+08:00|          4.0|          -4.0|          4.0|          4.0|                          8.0|                          0.0|                         16.0|                          1.0|                          0.0|
+|1970-01-01T08:00:00.005+08:00|          5.0|          -5.0|          5.0|          5.0|                         10.0|                          0.0|                         25.0|                          1.0|                          0.0|
++-----------------------------+-------------+--------------+-------------+-------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+Total line number = 5
+It costs 0.014s
+```
+
 ### Aggregate Query
 
 This section mainly introduces the related examples of aggregate query.
diff --git a/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md b/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
index 6964c7b..3c8648e 100644
--- a/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
+++ b/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
@@ -1386,6 +1386,50 @@ It costs 0.014s
 
 请参考 [UDF (用户定义函数)](../Advanced-Features/UDF-User-Defined-Function.md)。
 
+#### 算数运算查询
+
+##### 一元算数运算符
+
+支持的运算符:`+`, `-`
+
+输入数据类型要求:`INT32`, `INT64`, `FLOAT`和 `DOUBLE`
+
+输出数据类型:与输入数据类型一致
+
+##### 二元算数运算符
+
+支持的运算符:`+`, `-`, `*`, `/`,  `%`
+
+输入数据类型要求:`INT32`, `INT64`, `FLOAT`和 `DOUBLE`
+
+输出数据类型:`DOUBLE`
+
+注意:当某个时间戳下左操作数和右操作数都不为空(`null`)时,二元运算操作才会有输出结果
+
+##### 示例
+
+例如:
+
+```sql
+select s1, - s1, s2, + s2, s1 + s2, s1 - s2, s1 * s2, s1 / s2, s1 % s2 from root.sg.d1
+```
+
+结果:
+
+``` 
++-----------------------------+-------------+--------------+-------------+-------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+|                         Time|root.sg.d1.s1|-root.sg.d1.s1|root.sg.d1.s2|root.sg.d1.s2|root.sg.d1.s1 + root.sg.d1.s2|root.sg.d1.s1 - root.sg.d1.s2|root.sg.d1.s1 * root.sg.d1.s2|root.sg.d1.s1 / root.sg.d1.s2|root.sg.d1.s1 % root.sg.d1.s2|
++-----------------------------+-------------+--------------+-------------+-------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+|1970-01-01T08:00:00.001+08:00|          1.0|          -1.0|          1.0|          1.0|                          2.0|                          0.0|                          1.0|                          1.0|                          0.0|
+|1970-01-01T08:00:00.002+08:00|          2.0|          -2.0|          2.0|          2.0|                          4.0|                          0.0|                          4.0|                          1.0|                          0.0|
+|1970-01-01T08:00:00.003+08:00|          3.0|          -3.0|          3.0|          3.0|                          6.0|                          0.0|                          9.0|                          1.0|                          0.0|
+|1970-01-01T08:00:00.004+08:00|          4.0|          -4.0|          4.0|          4.0|                          8.0|                          0.0|                         16.0|                          1.0|                          0.0|
+|1970-01-01T08:00:00.005+08:00|          5.0|          -5.0|          5.0|          5.0|                         10.0|                          0.0|                         25.0|                          1.0|                          0.0|
++-----------------------------+-------------+--------------+-------------+-------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+Total line number = 5
+It costs 0.014s
+```
+
 #### 错误处理
 
 当LIMIT / SLIMIT的参数N / SN超过结果集的大小时,IoTDB将按预期返回所有结果。 例如,原始SQL语句的查询结果由六行组成,我们通过LIMIT子句选择前100行:
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index c28d9c6..c2c3157 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -113,7 +113,7 @@ public abstract class QueryPlan extends PhysicalPlan {
     alignByTime = align;
   }
 
-  public void addPathToIndex(String columnName, Integer index) {
+  public void setColumnNameToDatasetOutputIndex(String columnName, Integer index) {
     pathToIndex.put(columnName, index);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
index 74b6b25..49d6da5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
@@ -89,7 +89,7 @@ public class RawDataQueryPlan extends QueryPlan {
 
       String columnForDisplay = getColumnForDisplay(columnForReader, originalIndex);
       if (!columnForDisplaySet.contains(columnForDisplay)) {
-        addPathToIndex(columnForDisplay, getPathToIndex().size());
+        setColumnNameToDatasetOutputIndex(columnForDisplay, getPathToIndex().size());
         columnForDisplaySet.add(columnForDisplay);
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index bd2cc94..0ae2109 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -26,12 +26,15 @@ import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.db.query.expression.binary.BinaryExpression;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.db.query.expression.unary.NegationExpression;
 import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.time.ZoneId;
@@ -51,8 +54,7 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
   protected Map<String, UDTFExecutor> columnName2Executor = new HashMap<>();
   protected Map<Integer, UDTFExecutor> originalOutputColumnIndex2Executor = new HashMap<>();
 
-  protected List<String> datasetOutputColumnIndex2UdfColumnName = new ArrayList<>();
-  protected List<String> datasetOutputColumnIndex2RawQueryColumnName = new ArrayList<>();
+  protected Map<Integer, Integer> datasetOutputIndexToResultColumnIndex = new HashMap<>();
 
   protected Map<String, Integer> pathNameToReaderIndex;
 
@@ -95,12 +97,9 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
 
       String columnForDisplay = getColumnForDisplay(columnForReader, originalIndex);
       if (!columnForDisplaySet.contains(columnForDisplay)) {
-        addPathToIndex(columnForDisplay, getPathToIndex().size());
-        if (isUdf) {
-          addUdfOutputColumn(columnForDisplay);
-        } else {
-          addRawQueryOutputColumn(columnForDisplay);
-        }
+        int datasetOutputIndex = getPathToIndex().size();
+        setColumnNameToDatasetOutputIndex(columnForDisplay, datasetOutputIndex);
+        setDatasetOutputIndexToResultColumnIndex(datasetOutputIndex, originalIndex);
         columnForDisplaySet.add(columnForDisplay);
       }
     }
@@ -108,6 +107,11 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
     setPathNameToReaderIndex(pathNameToReaderIndex);
   }
 
+  private void setDatasetOutputIndexToResultColumnIndex(
+      int datasetOutputIndex, Integer originalIndex) {
+    datasetOutputIndexToResultColumnIndex.put(datasetOutputIndex, originalIndex);
+  }
+
   @Override
   public void constructUdfExecutors(List<ResultColumn> resultColumns) {
     for (int i = 0; i < resultColumns.size(); ++i) {
@@ -152,34 +156,56 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
     }
   }
 
+  public TSDataType getOriginalOutputColumnDataType(int originalOutputColumn) {
+    Expression expression = resultColumns.get(originalOutputColumn).getExpression();
+    // UDF query
+    if (expression instanceof FunctionExpression) {
+      return getExecutorByOriginalOutputColumnIndex(originalOutputColumn)
+          .getConfigurations()
+          .getOutputDataType();
+    }
+    // arithmetic binary query
+    if (expression instanceof BinaryExpression) {
+      return TSDataType.DOUBLE;
+    }
+    // arithmetic negation query
+    if (expression instanceof NegationExpression) {
+      return getDeduplicatedDataTypes()
+          .get(getReaderIndex(((NegationExpression) expression).getExpression().toString()));
+    }
+    // raw query
+    return getDeduplicatedDataTypes().get(getReaderIndex(expression.toString()));
+  }
+
   public UDTFExecutor getExecutorByOriginalOutputColumnIndex(int originalOutputColumn) {
     return originalOutputColumnIndex2Executor.get(originalOutputColumn);
   }
 
+  public ResultColumn getResultColumnByDatasetOutputIndex(int datasetOutputIndex) {
+    return resultColumns.get(datasetOutputIndexToResultColumnIndex.get(datasetOutputIndex));
+  }
+
   public UDTFExecutor getExecutorByDataSetOutputColumnIndex(int datasetOutputIndex) {
-    return columnName2Executor.get(datasetOutputColumnIndex2UdfColumnName.get(datasetOutputIndex));
+    return columnName2Executor.get(
+        getResultColumnByDatasetOutputIndex(datasetOutputIndex).getResultColumnName());
   }
 
   public String getRawQueryColumnNameByDatasetOutputColumnIndex(int datasetOutputIndex) {
-    return datasetOutputColumnIndex2RawQueryColumnName.get(datasetOutputIndex);
+    return getResultColumnByDatasetOutputIndex(datasetOutputIndex).getResultColumnName();
   }
 
   public boolean isUdfColumn(int datasetOutputIndex) {
-    return datasetOutputColumnIndex2UdfColumnName.get(datasetOutputIndex) != null;
+    return getResultColumnByDatasetOutputIndex(datasetOutputIndex).getExpression()
+        instanceof FunctionExpression;
   }
 
-  public int getReaderIndex(String pathName) {
-    return pathNameToReaderIndex.get(pathName);
+  public boolean isArithmeticColumn(int datasetOutputIndex) {
+    Expression expression = getResultColumnByDatasetOutputIndex(datasetOutputIndex).getExpression();
+    return expression instanceof BinaryExpression || expression instanceof NegationExpression;
   }
 
-  public void addUdfOutputColumn(String udfDatasetOutputColumn) {
-    datasetOutputColumnIndex2UdfColumnName.add(udfDatasetOutputColumn);
-    datasetOutputColumnIndex2RawQueryColumnName.add(null);
-  }
-
-  public void addRawQueryOutputColumn(String rawQueryOutputColumn) {
-    datasetOutputColumnIndex2UdfColumnName.add(null);
-    datasetOutputColumnIndex2RawQueryColumnName.add(rawQueryOutputColumn);
+  public int getReaderIndex(String pathName) {
+    return pathNameToReaderIndex.get(pathName);
   }
 
   public void setPathNameToReaderIndex(Map<String, Integer> pathNameToReaderIndex) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 132fdba..9d110f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -221,7 +221,7 @@ import org.apache.iotdb.db.query.expression.binary.ModuloExpression;
 import org.apache.iotdb.db.query.expression.binary.MultiplicationExpression;
 import org.apache.iotdb.db.query.expression.binary.SubtractionExpression;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
-import org.apache.iotdb.db.query.expression.unary.MinusExpression;
+import org.apache.iotdb.db.query.expression.unary.NegationExpression;
 import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -1053,7 +1053,7 @@ public class IoTDBSqlVisitor extends SqlBaseBaseVisitor<Operator> {
     }
     if (context.unary != null) {
       return context.MINUS() != null
-          ? new MinusExpression(parseExpression(context.expression(0)))
+          ? new NegationExpression(parseExpression(context.expression(0)))
           : parseExpression(context.expression(0));
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 19d81c6..670f2c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -24,12 +24,26 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.binary.AdditionExpression;
+import org.apache.iotdb.db.query.expression.binary.BinaryExpression;
+import org.apache.iotdb.db.query.expression.binary.DivisionExpression;
+import org.apache.iotdb.db.query.expression.binary.ModuloExpression;
+import org.apache.iotdb.db.query.expression.binary.MultiplicationExpression;
+import org.apache.iotdb.db.query.expression.binary.SubtractionExpression;
+import org.apache.iotdb.db.query.expression.unary.NegationExpression;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.input.InputLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticAdditionTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticDivisionTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticModuloTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticMultiplicationTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticSubtractionTransformer;
 import org.apache.iotdb.db.query.udf.core.transformer.RawQueryPointTransformer;
 import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
 import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowTransformer;
@@ -80,7 +94,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
             readersOfSelectedSeries,
             cached);
     udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
-    initTransformers(UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB);
+    initTransformers();
   }
 
   /** execute without value filters */
@@ -102,24 +116,36 @@ public abstract class UDTFDataSet extends QueryDataSet {
             deduplicatedDataTypes,
             readersOfSelectedSeries);
     udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
-    initTransformers(UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB);
+    initTransformers();
   }
 
-  @SuppressWarnings("squid:S3518") // "Math.max(windowTransformerCount, 1)" can't be zero
-  protected void initTransformers(float memoryBudgetInMB)
-      throws QueryProcessException, IOException {
-    int size = udtfPlan.getPathToIndex().size();
+  protected void initTransformers() throws QueryProcessException, IOException {
+    final float memoryBudgetForSingleWindowTransformer =
+        calculateMemoryBudgetForSingleWindowTransformer();
+    final int size = udtfPlan.getPathToIndex().size();
     transformers = new Transformer[size];
+    for (int i = 0; i < size; ++i) {
+      if (udtfPlan.isUdfColumn(i)) {
+        constructUdfTransformer(i, memoryBudgetForSingleWindowTransformer);
+      } else if (udtfPlan.isArithmeticColumn(i)) {
+        constructArithmeticTransformer(i);
+      } else {
+        constructRawQueryTransformer(i);
+      }
+    }
+  }
 
+  @SuppressWarnings("squid:S3518") // "Math.max(windowTransformerCount, 1)" can't be zero
+  private float calculateMemoryBudgetForSingleWindowTransformer() {
+    int size = udtfPlan.getPathToIndex().size();
     int windowTransformerCount = 0;
     for (int i = 0; i < size; ++i) {
       if (udtfPlan.isUdfColumn(i)) {
-        AccessStrategy accessStrategy =
-            udtfPlan
-                .getExecutorByDataSetOutputColumnIndex(i)
-                .getConfigurations()
-                .getAccessStrategy();
-        switch (accessStrategy.getAccessStrategyType()) {
+        switch (udtfPlan
+            .getExecutorByDataSetOutputColumnIndex(i)
+            .getConfigurations()
+            .getAccessStrategy()
+            .getAccessStrategyType()) {
           case SLIDING_SIZE_WINDOW:
           case SLIDING_TIME_WINDOW:
             ++windowTransformerCount;
@@ -129,40 +155,34 @@ public abstract class UDTFDataSet extends QueryDataSet {
         }
       }
     }
-    memoryBudgetInMB /= Math.max(windowTransformerCount, 1);
+    return UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB / Math.max(windowTransformerCount, 1);
+  }
 
-    for (int i = 0; i < size; ++i) {
-      if (udtfPlan.isUdfColumn(i)) {
-        UDTFExecutor executor = udtfPlan.getExecutorByDataSetOutputColumnIndex(i);
-        int[] readerIndexes = calculateReaderIndexes(executor);
-        AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
-        switch (accessStrategy.getAccessStrategyType()) {
-          case ROW_BY_ROW:
-            transformers[i] =
-                new UDFQueryRowTransformer(inputLayer.constructRowReader(readerIndexes), executor);
-            break;
-          case SLIDING_SIZE_WINDOW:
-          case SLIDING_TIME_WINDOW:
-            transformers[i] =
-                new UDFQueryRowWindowTransformer(
-                    inputLayer.constructRowWindowReader(
-                        readerIndexes, accessStrategy, memoryBudgetInMB),
-                    executor);
-            break;
-          default:
-            throw new UnsupportedOperationException("Unsupported transformer access strategy");
-        }
-      } else {
-        transformers[i] =
-            new RawQueryPointTransformer(
-                inputLayer.constructPointReader(
-                    udtfPlan.getReaderIndex(
-                        udtfPlan.getRawQueryColumnNameByDatasetOutputColumnIndex(i))));
-      }
+  private void constructUdfTransformer(
+      int columnIndex, float memoryBudgetForSingleWindowTransformer)
+      throws QueryProcessException, IOException {
+    UDTFExecutor executor = udtfPlan.getExecutorByDataSetOutputColumnIndex(columnIndex);
+    int[] readerIndexes = calculateUdfReaderIndexes(executor);
+    AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
+    switch (accessStrategy.getAccessStrategyType()) {
+      case ROW_BY_ROW:
+        transformers[columnIndex] =
+            new UDFQueryRowTransformer(inputLayer.constructRowReader(readerIndexes), executor);
+        break;
+      case SLIDING_SIZE_WINDOW:
+      case SLIDING_TIME_WINDOW:
+        transformers[columnIndex] =
+            new UDFQueryRowWindowTransformer(
+                inputLayer.constructRowWindowReader(
+                    readerIndexes, accessStrategy, memoryBudgetForSingleWindowTransformer),
+                executor);
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported transformer access strategy");
     }
   }
 
-  private int[] calculateReaderIndexes(UDTFExecutor executor) {
+  private int[] calculateUdfReaderIndexes(UDTFExecutor executor) {
     List<PartialPath> paths = executor.getExpression().getPaths();
     int[] readerIndexes = new int[paths.size()];
     for (int i = 0; i < readerIndexes.length; ++i) {
@@ -171,6 +191,62 @@ public abstract class UDTFDataSet extends QueryDataSet {
     return readerIndexes;
   }
 
+  private void constructArithmeticTransformer(int columnIndex) {
+    Expression expression =
+        udtfPlan.getResultColumnByDatasetOutputIndex(columnIndex).getExpression();
+
+    // unary expression
+    if (expression instanceof NegationExpression) {
+      transformers[columnIndex] =
+          new ArithmeticNegationTransformer(
+              constructPointReaderBySeriesName(
+                  ((NegationExpression) expression).getExpression().toString()));
+      return;
+    }
+
+    // binary expression
+    BinaryExpression binaryExpression = (BinaryExpression) expression;
+    if (binaryExpression instanceof AdditionExpression) {
+      transformers[columnIndex] =
+          new ArithmeticAdditionTransformer(
+              constructPointReaderBySeriesName(binaryExpression.getLeftExpression().toString()),
+              constructPointReaderBySeriesName(binaryExpression.getRightExpression().toString()));
+    } else if (binaryExpression instanceof SubtractionExpression) {
+      transformers[columnIndex] =
+          new ArithmeticSubtractionTransformer(
+              constructPointReaderBySeriesName(binaryExpression.getLeftExpression().toString()),
+              constructPointReaderBySeriesName(binaryExpression.getRightExpression().toString()));
+    } else if (binaryExpression instanceof MultiplicationExpression) {
+      transformers[columnIndex] =
+          new ArithmeticMultiplicationTransformer(
+              constructPointReaderBySeriesName(binaryExpression.getLeftExpression().toString()),
+              constructPointReaderBySeriesName(binaryExpression.getRightExpression().toString()));
+    } else if (binaryExpression instanceof DivisionExpression) {
+      transformers[columnIndex] =
+          new ArithmeticDivisionTransformer(
+              constructPointReaderBySeriesName(binaryExpression.getLeftExpression().toString()),
+              constructPointReaderBySeriesName(binaryExpression.getRightExpression().toString()));
+    } else if (binaryExpression instanceof ModuloExpression) {
+      transformers[columnIndex] =
+          new ArithmeticModuloTransformer(
+              constructPointReaderBySeriesName(binaryExpression.getLeftExpression().toString()),
+              constructPointReaderBySeriesName(binaryExpression.getRightExpression().toString()));
+    } else {
+      throw new UnsupportedOperationException(binaryExpression.toString());
+    }
+  }
+
+  private void constructRawQueryTransformer(int columnIndex) {
+    transformers[columnIndex] =
+        new RawQueryPointTransformer(
+            constructPointReaderBySeriesName(
+                udtfPlan.getRawQueryColumnNameByDatasetOutputColumnIndex(columnIndex)));
+  }
+
+  private LayerPointReader constructPointReaderBySeriesName(String seriesName) {
+    return inputLayer.constructPointReader(udtfPlan.getReaderIndex(seriesName));
+  }
+
   public void finalizeUDFs(long queryId) {
     udtfPlan.finalizeUDFExecutors(queryId);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index b0c5f60..f610c5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -19,11 +19,9 @@
 
 package org.apache.iotdb.db.query.expression;
 
-import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.List;
 import java.util.Set;
@@ -38,9 +36,6 @@ public interface Expression {
     return false;
   }
 
-  // TODO: implement this method
-  TSDataType dataType() throws MetadataException;
-
   void concat(List<PartialPath> prefixPaths, List<Expression> resultExpressions);
 
   void removeWildcards(WildcardsRemover wildcardsRemover, List<Expression> resultExpressions)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index cff5bc3..48f94f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -39,14 +38,9 @@ public abstract class BinaryExpression implements Expression {
     this.rightExpression = rightExpression;
   }
 
-  /**
-   * The result data type of all arithmetic operations will be DOUBLE.
-   *
-   * <p>TODO: This is just a simple implementation and should be optimized later.
-   */
   @Override
-  public final TSDataType dataType() {
-    return TSDataType.DOUBLE;
+  public boolean isTimeSeriesGeneratingFunctionExpression() {
+    return true;
   }
 
   @Override
@@ -108,6 +102,14 @@ public abstract class BinaryExpression implements Expression {
     rightExpression.collectPaths(pathSet);
   }
 
+  public Expression getLeftExpression() {
+    return leftExpression;
+  }
+
+  public Expression getRightExpression() {
+    return rightExpression;
+  }
+
   @Override
   public final String toString() {
     return String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index a321f97..01d8686 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -19,15 +19,12 @@
 
 package org.apache.iotdb.db.query.expression.unary;
 
-import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -55,7 +52,6 @@ public class FunctionExpression implements Expression {
    */
   private List<Expression> expressions;
 
-  private List<TSDataType> dataTypes;
   private List<PartialPath> paths;
 
   private String expressionString;
@@ -113,12 +109,6 @@ public class FunctionExpression implements Expression {
   }
 
   @Override
-  public TSDataType dataType() {
-    // TODO: the expression type is determined in runtime
-    throw new NotImplementedException();
-  }
-
-  @Override
   public void concat(List<PartialPath> prefixPaths, List<Expression> resultExpressions) {
     List<List<Expression>> resultExpressionsForRecursionList = new ArrayList<>();
 
@@ -153,16 +143,6 @@ public class FunctionExpression implements Expression {
     }
   }
 
-  public List<TSDataType> getDataTypes() throws MetadataException {
-    if (dataTypes == null) {
-      dataTypes = new ArrayList<>();
-      for (Expression expression : expressions) {
-        dataTypes.add(expression.dataType());
-      }
-    }
-    return dataTypes;
-  }
-
   public List<PartialPath> getPaths() {
     if (paths == null) {
       paths = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
similarity index 82%
rename from server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java
rename to server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index afd0017..675adb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -19,22 +19,20 @@
 
 package org.apache.iotdb.db.query.expression.unary;
 
-import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
-public class MinusExpression implements Expression {
+public class NegationExpression implements Expression {
 
   protected Expression expression;
 
-  public MinusExpression(Expression expression) {
+  public NegationExpression(Expression expression) {
     this.expression = expression;
   }
 
@@ -43,8 +41,8 @@ public class MinusExpression implements Expression {
   }
 
   @Override
-  public TSDataType dataType() throws MetadataException {
-    return expression.dataType();
+  public boolean isTimeSeriesGeneratingFunctionExpression() {
+    return true;
   }
 
   @Override
@@ -52,7 +50,7 @@ public class MinusExpression implements Expression {
     List<Expression> resultExpressionsForRecursion = new ArrayList<>();
     expression.concat(prefixPaths, resultExpressionsForRecursion);
     for (Expression resultExpression : resultExpressionsForRecursion) {
-      resultExpressions.add(new MinusExpression(resultExpression));
+      resultExpressions.add(new NegationExpression(resultExpression));
     }
   }
 
@@ -62,7 +60,7 @@ public class MinusExpression implements Expression {
     List<Expression> resultExpressionsForRecursion = new ArrayList<>();
     expression.removeWildcards(wildcardsRemover, resultExpressionsForRecursion);
     for (Expression resultExpression : resultExpressionsForRecursion) {
-      resultExpressions.add(new MinusExpression(resultExpression));
+      resultExpressions.add(new NegationExpression(resultExpression));
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index e69353c..232d031 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -19,12 +19,10 @@
 
 package org.apache.iotdb.db.query.expression.unary;
 
-import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.List;
@@ -48,14 +46,6 @@ public class TimeSeriesOperand implements Expression {
   }
 
   @Override
-  public TSDataType dataType() throws MetadataException {
-    if (dataType == null) {
-      dataType = IoTDB.metaManager.getSeriesType(path);
-    }
-    return dataType;
-  }
-
-  @Override
   public void concat(List<PartialPath> prefixPaths, List<Expression> resultExpressions) {
     for (PartialPath prefixPath : prefixPaths) {
       resultExpressions.add(new TimeSeriesOperand(prefixPath.concatPath(path)));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticAdditionTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticAdditionTransformer.java
new file mode 100644
index 0000000..031f58d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticAdditionTransformer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+public class ArithmeticAdditionTransformer extends ArithmeticBinaryTransformer {
+
+  public ArithmeticAdditionTransformer(
+      LayerPointReader leftPointReader, LayerPointReader rightPointReader) {
+    super(leftPointReader, rightPointReader);
+  }
+
+  @Override
+  protected double evaluate(double leftOperand, double rightOperand) {
+    return leftOperand + rightOperand;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticBinaryTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticBinaryTransformer.java
new file mode 100644
index 0000000..1ba4819
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticBinaryTransformer.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+
+public abstract class ArithmeticBinaryTransformer extends Transformer {
+
+  private final LayerPointReader leftPointReader;
+  private final LayerPointReader rightPointReader;
+
+  protected ArithmeticBinaryTransformer(
+      LayerPointReader leftPointReader, LayerPointReader rightPointReader) {
+    this.leftPointReader = leftPointReader;
+    this.rightPointReader = rightPointReader;
+  }
+
+  @Override
+  protected boolean cacheValue() throws QueryProcessException, IOException {
+    if (!leftPointReader.next() || !rightPointReader.next()) {
+      return false;
+    }
+    if (!cacheTime()) {
+      return false;
+    }
+    cachedDouble =
+        evaluate(
+            castCurrentValueToDoubleOperand(leftPointReader),
+            castCurrentValueToDoubleOperand(rightPointReader));
+    leftPointReader.readyForNext();
+    rightPointReader.readyForNext();
+    return true;
+  }
+
+  /**
+   * finds the smallest, unconsumed timestamp that exists in both {@code leftPointReader} and {@code
+   * rightPointReader} and then caches the timestamp in {@code cachedTime}.
+   *
+   * @return true if there has a timestamp that meets the requirements
+   */
+  private boolean cacheTime() throws IOException, QueryProcessException {
+    long leftTime = leftPointReader.currentTime();
+    long rightTime = rightPointReader.currentTime();
+
+    while (leftTime != rightTime) {
+      if (leftTime < rightTime) {
+        leftPointReader.readyForNext();
+        if (!leftPointReader.next()) {
+          return false;
+        }
+        leftTime = leftPointReader.currentTime();
+      } else {
+        rightPointReader.readyForNext();
+        if (!rightPointReader.next()) {
+          return false;
+        }
+        rightTime = rightPointReader.currentTime();
+      }
+    }
+
+    // leftTime == rightTime
+    cachedTime = leftTime;
+    return true;
+  }
+
+  protected abstract double evaluate(double leftOperand, double rightOperand);
+
+  private static double castCurrentValueToDoubleOperand(LayerPointReader layerPointReader)
+      throws IOException, QueryProcessException {
+    switch (layerPointReader.getDataType()) {
+      case INT32:
+        return layerPointReader.currentInt();
+      case INT64:
+        return layerPointReader.currentLong();
+      case FLOAT:
+        return layerPointReader.currentFloat();
+      case DOUBLE:
+        return layerPointReader.currentDouble();
+      default:
+        throw new QueryProcessException(
+            "Unsupported data type: " + layerPointReader.getDataType().toString());
+    }
+  }
+
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.DOUBLE;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticDivisionTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticDivisionTransformer.java
new file mode 100644
index 0000000..c603003
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticDivisionTransformer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+public class ArithmeticDivisionTransformer extends ArithmeticBinaryTransformer {
+
+  public ArithmeticDivisionTransformer(
+      LayerPointReader leftPointReader, LayerPointReader rightPointReader) {
+    super(leftPointReader, rightPointReader);
+  }
+
+  @Override
+  protected double evaluate(double leftOperand, double rightOperand) {
+    return leftOperand / rightOperand;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticModuloTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticModuloTransformer.java
new file mode 100644
index 0000000..561974a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticModuloTransformer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+public class ArithmeticModuloTransformer extends ArithmeticBinaryTransformer {
+
+  public ArithmeticModuloTransformer(
+      LayerPointReader leftPointReader, LayerPointReader rightPointReader) {
+    super(leftPointReader, rightPointReader);
+  }
+
+  @Override
+  protected double evaluate(double leftOperand, double rightOperand) {
+    return leftOperand % rightOperand;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticMultiplicationTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticMultiplicationTransformer.java
new file mode 100644
index 0000000..566108b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticMultiplicationTransformer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+public class ArithmeticMultiplicationTransformer extends ArithmeticBinaryTransformer {
+
+  public ArithmeticMultiplicationTransformer(
+      LayerPointReader leftPointReader, LayerPointReader rightPointReader) {
+    super(leftPointReader, rightPointReader);
+  }
+
+  @Override
+  protected double evaluate(double leftOperand, double rightOperand) {
+    return leftOperand * rightOperand;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticNegationTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticNegationTransformer.java
new file mode 100644
index 0000000..46a6063
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticNegationTransformer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+
+public class ArithmeticNegationTransformer extends Transformer {
+
+  private final LayerPointReader layerPointReader;
+
+  public ArithmeticNegationTransformer(LayerPointReader layerPointReader) {
+    this.layerPointReader = layerPointReader;
+  }
+
+  @Override
+  protected boolean cacheValue() throws QueryProcessException, IOException {
+    if (!layerPointReader.next()) {
+      return false;
+    }
+    cachedTime = layerPointReader.currentTime();
+    switch (layerPointReader.getDataType()) {
+      case INT32:
+        cachedInt = -layerPointReader.currentInt();
+        break;
+      case INT64:
+        cachedLong = -layerPointReader.currentLong();
+        break;
+      case FLOAT:
+        cachedFloat = -layerPointReader.currentFloat();
+        break;
+      case DOUBLE:
+        cachedDouble = -layerPointReader.currentDouble();
+        break;
+      default:
+        throw new QueryProcessException(
+            "Unsupported data type: " + layerPointReader.getDataType().toString());
+    }
+    layerPointReader.readyForNext();
+    return true;
+  }
+
+  @Override
+  public TSDataType getDataType() {
+    return layerPointReader.getDataType();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticSubtractionTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticSubtractionTransformer.java
new file mode 100644
index 0000000..5ce2a58
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/ArithmeticSubtractionTransformer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.transformer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+public class ArithmeticSubtractionTransformer extends ArithmeticBinaryTransformer {
+
+  public ArithmeticSubtractionTransformer(
+      LayerPointReader leftPointReader, LayerPointReader rightPointReader) {
+    super(leftPointReader, rightPointReader);
+  }
+
+  @Override
+  protected double evaluate(double leftOperand, double rightOperand) {
+    return leftOperand - rightOperand;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index f09454e..19fa259 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -83,7 +83,6 @@ import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
 import org.apache.iotdb.db.query.dataset.UDTFDataSet;
 import org.apache.iotdb.db.query.expression.ResultColumn;
-import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
 import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -1054,13 +1053,7 @@ public class TSServiceImpl implements TSIService.Iface {
         UDTFPlan udtfPlan = (UDTFPlan) plan;
         for (int i = 0; i < paths.size(); i++) {
           respColumns.add(resultColumns.get(i).getResultColumnName());
-          seriesTypes.add(
-              resultColumns.get(i).getExpression() instanceof TimeSeriesOperand
-                  ? udtfPlan.getDataTypes().get(i)
-                  : udtfPlan
-                      .getExecutorByOriginalOutputColumnIndex(i)
-                      .getConfigurations()
-                      .getOutputDataType());
+          seriesTypes.add(udtfPlan.getOriginalOutputColumnDataType(i));
         }
         break;
       default:
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBArithmeticIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBArithmeticIT.java
new file mode 100644
index 0000000..840b595
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBArithmeticIT.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class IoTDBArithmeticIT {
+
+  private static final double E = 0.0001;
+
+  private static final String[] INSERTION_SQLS = {
+    "insert into root.sg.d1(time, s1, s2, s3, s4, s5, s6, s7) values (1, 1, 1, 1, 1, false, '1', 1)",
+    "insert into root.sg.d1(time, s1, s2, s3, s4, s5, s6, s8) values (2, 2, 2, 2, 2, false, '2', 2)",
+    "insert into root.sg.d1(time, s1, s2, s3, s4, s5, s6, s7) values (3, 3, 3, 3, 3, true, '3', 3)",
+    "insert into root.sg.d1(time, s1, s2, s3, s4, s5, s6, s8) values (4, 4, 4, 4, 4, true, '4', 4)",
+    "insert into root.sg.d1(time, s1, s2, s3, s4, s5, s6, s7, s8) values (5, 5, 5, 5, 5, true, '5', 5, 5)",
+  };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    createTimeSeries();
+    generateData();
+  }
+
+  private static void createTimeSeries() throws MetadataException {
+    IoTDB.metaManager.setStorageGroup(new PartialPath("root.sg"));
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.sg.d1.s1"),
+        TSDataType.INT32,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.sg.d1.s2"),
+        TSDataType.INT64,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.sg.d1.s3"),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.sg.d1.s4"),
+        TSDataType.DOUBLE,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.sg.d1.s5"),
+        TSDataType.BOOLEAN,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.sg.d1.s6"),
+        TSDataType.TEXT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.sg.d1.s7"),
+        TSDataType.INT32,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.sg.d1.s8"),
+        TSDataType.INT32,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+  }
+
+  private static void generateData() {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (String dataGenerationSql : INSERTION_SQLS) {
+        statement.execute(dataGenerationSql);
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testArithmeticBinary() {
+    try (Statement statement =
+        DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")
+            .createStatement()) {
+
+      String[] operands = new String[] {"s1", "s2", "s3", "s4"};
+      for (String operator : new String[] {" + ", " - ", " * ", " / ", " % "}) {
+        List<String> expressions = new ArrayList<>();
+        for (String leftOperand : operands) {
+          for (String rightOperand : operands) {
+            expressions.add(leftOperand + operator + rightOperand);
+          }
+        }
+        String sql = String.format("select %s from root.sg.d1", String.join(",", expressions));
+
+        ResultSet resultSet = statement.executeQuery(sql);
+
+        assertEquals(1 + expressions.size(), resultSet.getMetaData().getColumnCount());
+
+        for (int i = 1; i < INSERTION_SQLS.length + 1; ++i) {
+          resultSet.next();
+          for (int j = 0; j < expressions.size(); ++j) {
+            double expected = 0;
+            switch (operator) {
+              case " + ":
+                expected = i + i;
+                break;
+              case " - ":
+                expected = i - i;
+                break;
+              case " * ":
+                expected = i * i;
+                break;
+              case " / ":
+                expected = i / i;
+                break;
+              case " % ":
+                expected = i % i;
+                break;
+            }
+            double actual = Double.parseDouble(resultSet.getString(2 + j));
+            assertEquals(expected, actual, E);
+          }
+        }
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testArithmeticUnary() {
+    try (Statement statement =
+        DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")
+            .createStatement()) {
+      String[] expressions = new String[] {"- s1", "- s2", "- s3", "- s4"};
+      String sql = String.format("select %s from root.sg.d1", String.join(",", expressions));
+      ResultSet resultSet = statement.executeQuery(sql);
+
+      assertEquals(1 + expressions.length, resultSet.getMetaData().getColumnCount());
+
+      for (int i = 1; i < INSERTION_SQLS.length + 1; ++i) {
+        resultSet.next();
+        for (int j = 0; j < expressions.length; ++j) {
+          double expected = -i;
+          double actual = Double.parseDouble(resultSet.getString(2 + j));
+          assertEquals(expected, actual, E);
+        }
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testHybridQuery() {
+    try (Statement statement =
+        DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")
+            .createStatement()) {
+      String[] expressions = new String[] {"s1", "s1 + s2", "sin(s1)"};
+      String sql = String.format("select %s from root.sg.d1", String.join(",", expressions));
+      ResultSet resultSet = statement.executeQuery(sql);
+
+      assertEquals(1 + expressions.length, resultSet.getMetaData().getColumnCount());
+
+      for (int i = 1; i < INSERTION_SQLS.length + 1; ++i) {
+        resultSet.next();
+        assertEquals(i, Double.parseDouble(resultSet.getString(2)), E);
+        assertEquals(i + i, Double.parseDouble(resultSet.getString(3)), E);
+        assertEquals(Math.sin(i), Double.parseDouble(resultSet.getString(4)), E);
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testNonAlign() {
+    try (Statement statement =
+        DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")
+            .createStatement()) {
+      ResultSet resultSet = statement.executeQuery("select s7 + s8 from root.sg.d1");
+      assertEquals(1 + 1, resultSet.getMetaData().getColumnCount());
+      assertTrue(resultSet.next());
+      assertEquals(10, Double.parseDouble(resultSet.getString(2)), E);
+      assertFalse(resultSet.next());
+
+      resultSet = statement.executeQuery("select s7 + s8 from root.sg.d1 where time < 5");
+      assertEquals(1 + 1, resultSet.getMetaData().getColumnCount());
+      assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testWrongTypeBoolean() {
+    try (Statement statement =
+        DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")
+            .createStatement()) {
+      statement.executeQuery("select s1 + s5 from root.sg.d1");
+    } catch (SQLException throwable) {
+      assertTrue(throwable.getMessage().contains("Unsupported data type: BOOLEAN"));
+    }
+  }
+
+  @Test
+  public void testWrongTypeText() {
+    try (Statement statement =
+        DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")
+            .createStatement()) {
+      statement.executeQuery("select s1 + s6 from root.sg.d1");
+    } catch (SQLException throwable) {
+      assertTrue(throwable.getMessage().contains("Unsupported data type: TEXT"));
+    }
+  }
+}